Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cflib/crazyflie/platformservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
APP_CHANNEL = 2

PLATFORM_SET_CONT_WAVE = 0
PLATFORM_REQUEST_ARMING = 1 # Deprecated: use supervisor.send_arming_request()
PLATFORM_REQUEST_CRASH_RECOVERY = 2 # Deprecated: use supervisor.send_crash_recovery_request()

VERSION_GET_PROTOCOL = 0
VERSION_GET_FIRMWARE = 1
Expand Down
111 changes: 75 additions & 36 deletions cflib/crazyflie/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
Provides access to the supervisor module of the Crazyflie platform.
"""
import logging
import struct
import threading
import time
import warnings

from cflib.crazyflie.localization import Localization
from cflib.crazyflie.platformservice import PLATFORM_COMMAND
from cflib.crazyflie.platformservice import PLATFORM_REQUEST_ARMING
from cflib.crazyflie.platformservice import PLATFORM_REQUEST_CRASH_RECOVERY
from cflib.crtp.crtpstack import CRTPPacket
from cflib.crtp.crtpstack import CRTPPort

Expand Down Expand Up @@ -93,18 +98,19 @@ def __init__(self, crazyflie):
self._cf.add_port_callback(CRTPPort.SUPERVISOR, self._supervisor_callback)
self._bitfield_received = threading.Event()

def _check_protocol_version(self):
"""Returns True if the protocol version is supported, False otherwise."""
def _is_legacy_firmware(self):
"""Returns True if the firmware does not support the supervisor port."""
return self._cf.platform.get_protocol_version() < 12

def _warn_legacy_firmware(self):
"""Warn that the firmware is too old for the supervisor port."""
version = self._cf.platform.get_protocol_version()
if version < 12:
warnings.warn(
'The supervisor subsystem requires CRTP protocol version 12 or later. '
f'Connected Crazyflie reports version {version}. '
'Update your Crazyflie firmware.',
stacklevel=3,
)
return False
return True
warnings.warn(
'The supervisor subsystem requires CRTP protocol version 12 or later. '
f'Connected Crazyflie reports version {version}. '
'Update your Crazyflie firmware. Using legacy fallback.',
stacklevel=3,
)

def _supervisor_callback(self, pk: CRTPPacket):
"""
Expand Down Expand Up @@ -136,7 +142,8 @@ def _fetch_bitfield(self, timeout=0.2):
Request the bitfield and wait for response (blocking).
Uses time-based cache to avoid sending packages too frequently.
"""
if not self._check_protocol_version():
if self._is_legacy_firmware():
self._warn_legacy_firmware()
return 0
now = time.time()

Expand Down Expand Up @@ -253,50 +260,82 @@ def send_arming_request(self, do_arm: bool):
"""
Send system arm/disarm request.

If the connected Crazyflie does not support CRTP protocol version 12
or later, the legacy platform channel is used as a fallback.

Args:
do_arm (bool): True = arm the system, False = disarm the system
"""
if not self._check_protocol_version():
return
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_ARM_SYSTEM, do_arm)
self._cf.send_packet(pk)
if self._is_legacy_firmware():
self._warn_legacy_firmware()
pk = CRTPPacket()
pk.set_header(CRTPPort.PLATFORM, PLATFORM_COMMAND)
pk.data = (PLATFORM_REQUEST_ARMING, do_arm)
self._cf.send_packet(pk)
else:
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_ARM_SYSTEM, do_arm)
self._cf.send_packet(pk)
logger.debug(f'Sent arming request: do_arm={do_arm}')

def send_crash_recovery_request(self):
"""
Send crash recovery request.

If the connected Crazyflie does not support CRTP protocol version 12
or later, the legacy platform channel is used as a fallback.
"""
if not self._check_protocol_version():
return
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_RECOVER_SYSTEM,)
self._cf.send_packet(pk)
if self._is_legacy_firmware():
self._warn_legacy_firmware()
pk = CRTPPacket()
pk.set_header(CRTPPort.PLATFORM, PLATFORM_COMMAND)
pk.data = (PLATFORM_REQUEST_CRASH_RECOVERY,)
self._cf.send_packet(pk)
else:
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_RECOVER_SYSTEM,)
self._cf.send_packet(pk)
logger.debug('Sent crash recovery request')

def send_emergency_stop(self):
"""
Send emergency stop. The Crazyflie will immediately stop all motors.

If the connected Crazyflie does not support CRTP protocol version 12
or later, the legacy localization channel is used as a fallback.
"""
if not self._check_protocol_version():
return
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_EMERGENCY_STOP,)
self._cf.send_packet(pk)
if self._is_legacy_firmware():
self._warn_legacy_firmware()
pk = CRTPPacket()
pk.set_header(CRTPPort.LOCALIZATION, Localization.GENERIC_CH)
pk.data = struct.pack('<B', Localization.EMERGENCY_STOP)
self._cf.send_packet(pk)
else:
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_EMERGENCY_STOP,)
self._cf.send_packet(pk)
logger.debug('Sent emergency stop')

def send_emergency_stop_watchdog(self):
"""
Send emergency stop watchdog. The Crazyflie will stop all motors
unless this command is repeated at regular intervals.

If the connected Crazyflie does not support CRTP protocol version 12
or later, the legacy localization channel is used as a fallback.
"""
if not self._check_protocol_version():
return
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_EMERGENCY_STOP_WATCHDOG,)
self._cf.send_packet(pk)
if self._is_legacy_firmware():
self._warn_legacy_firmware()
pk = CRTPPacket()
pk.set_header(CRTPPort.LOCALIZATION, Localization.GENERIC_CH)
pk.data = struct.pack('<B', Localization.EMERGENCY_STOP_WATCHDOG)
self._cf.send_packet(pk)
else:
pk = CRTPPacket()
pk.set_header(CRTPPort.SUPERVISOR, SUPERVISOR_CH_COMMAND)
pk.data = (CMD_EMERGENCY_STOP_WATCHDOG,)
self._cf.send_packet(pk)
logger.debug('Sent emergency stop watchdog')
Loading