Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 48 additions & 27 deletions src/cfclient/ui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@
The main file for the Crazyflie control application.
"""

from __future__ import annotations

import asyncio
import logging
import sys
from collections.abc import Callable, Coroutine
from concurrent.futures import Future
from typing import Any

import cfclient
from cfclient.gui import create_task
Expand All @@ -40,6 +45,7 @@
from cfclient.utils.config import Config
from cfclient.utils.config_manager import ConfigManager
from cfclient.utils.input import JoystickReader
from cfclient.utils.input.inputreaderinterface import InputReaderInterface
from cfclient.utils.ui import UiUtils
from cfclient.ui.dialogs.inputconfigdialogue import InputConfigDialogue
from cflib2 import Crazyflie, LinkContext
Expand Down Expand Up @@ -353,19 +359,21 @@ def set_preferred_dock_area(

# --- Event loop ---

def _on_event_loop_ready(self):
def _on_event_loop_ready(self) -> None:
self._loop = asyncio.get_event_loop()
self._scan(self._connectivity_manager.get_address())

# --- Commander pipeline ---

async def _safe_send(self, coro_fn):
async def _safe_send(
self, coro_fn: Callable[[], Coroutine[Any, Any, None]]
) -> None:
try:
await coro_fn()
except DisconnectedError:
pass

def _commander_future_cb(self, future):
def _commander_future_cb(self, future: Future[None]) -> None:
"""Log unexpected exceptions from commander coroutines."""
try:
future.result()
Expand All @@ -374,7 +382,9 @@ def _commander_future_cb(self, future):
except Exception:
logger.error("Unhandled exception in commander coroutine", exc_info=True)

def _send_setpoint(self, roll, pitch, yaw, thrust):
def _send_setpoint(
self, roll: float, pitch: float, yaw: float, thrust: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -386,7 +396,9 @@ def _send_setpoint(self, roll, pitch, yaw, thrust):
)
future.add_done_callback(self._commander_future_cb)

def _send_velocity_world(self, vx, vy, vz, yawrate):
def _send_velocity_world(
self, vx: float, vy: float, vz: float, yawrate: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -398,7 +410,9 @@ def _send_velocity_world(self, vx, vy, vz, yawrate):
)
future.add_done_callback(self._commander_future_cb)

def _send_zdistance(self, roll, pitch, yawrate, zdistance):
def _send_zdistance(
self, roll: float, pitch: float, yawrate: float, zdistance: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -412,7 +426,9 @@ def _send_zdistance(self, roll, pitch, yawrate, zdistance):
)
future.add_done_callback(self._commander_future_cb)

def _send_hover(self, vx, vy, yawrate, zdistance):
def _send_hover(
self, vx: float, vy: float, yawrate: float, zdistance: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -424,13 +440,13 @@ def _send_hover(self, vx, vy, yawrate, zdistance):
)
future.add_done_callback(self._commander_future_cb)

def disable_input(self, disable):
def disable_input(self, disable: bool) -> None:
"""Disable gamepad input to allow a tab to send setpoints directly."""
self._disable_input = disable

# --- Emergency stop ---

def _emergency_stop(self):
def _emergency_stop(self) -> None:
if self.cf is not None:
create_task(self.cf.localization().emergency().send_emergency_stop())

Expand Down Expand Up @@ -566,6 +582,7 @@ async def _async_disconnect(self) -> None:
self._disconnect_watch_task.cancel()
self._disconnect_watch_task = None
if self.cf is not None:
logger.info(f"Disconnected from {self.cf.uri}")
self._notify_tabs_disconnected()
await self.cf.disconnect()
self.cf = None
Expand All @@ -586,25 +603,29 @@ def _notify_tabs_disconnected(self) -> None:
for tab_toolbox in self.loaded_tab_toolboxes.values():
tab_toolbox.disconnected()

async def _stream_battery(self, cf):
async def _stream_battery(self, cf: Crazyflie) -> None:
log = cf.log()
block = await log.create_block()
await block.add_variable("pm.vbat")
await block.add_variable("pm.state")
stream = await block.start(1000)
stream = None
try:
block = await log.create_block()
await block.add_variable("pm.vbat")
await block.add_variable("pm.state")
stream = await block.start(1000)
while True:
data = await stream.next()
self._battery_signal.emit(
data.data["pm.vbat"], int(data.data["pm.state"])
)
except DisconnectedError:
pass
Comment thread
ArisMorgens marked this conversation as resolved.
finally:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass
if stream is not None:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass

def _update_battery(self, vbat, state):
def _update_battery(self, vbat: float, state: int) -> None:
self.batteryBar.setValue(int(vbat * 1000))

color = UiUtils.COLOR_BLUE
Expand Down Expand Up @@ -680,16 +701,16 @@ def set_default_theme(self) -> None:

# --- Input device menu ---

def _show_input_device_config_dialog(self):
def _show_input_device_config_dialog(self) -> None:
self.inputConfig = InputConfigDialogue(self._joystick_reader)
self.inputConfig.show()

def _display_input_device_error(self, error):
def _display_input_device_error(self, error: str) -> None:
if self.cf is not None:
create_task(self._async_disconnect())
QMessageBox.critical(self, "Input device error", error)

def _mux_selected(self, checked):
def _mux_selected(self, checked: bool) -> None:
if not checked:
(mux, sub_nodes) = self.sender().data()
for s in sub_nodes:
Expand All @@ -709,7 +730,7 @@ def _mux_selected(self, checked):

self._update_input_device_footer()

def _get_dev_status(self, device):
def _get_dev_status(self, device: InputReaderInterface) -> str:
msg = "{}".format(device.name)
if device.supports_mapping:
map_name = "No input mapping"
Expand All @@ -718,7 +739,7 @@ def _get_dev_status(self, device):
msg += " ({})".format(map_name)
return msg

def _update_input_device_footer(self):
def _update_input_device_footer(self) -> None:
msg = ""

if len(self._joystick_reader.available_devices()) > 0:
Expand All @@ -738,7 +759,7 @@ def _update_input_device_footer(self):
msg = "No input device found"
self._statusbar_label.setText(msg)

def _inputdevice_selected(self, checked):
def _inputdevice_selected(self, checked: bool) -> None:
(map_menu, device, mux_menu) = self.sender().data()
if not checked:
if map_menu:
Expand Down Expand Up @@ -767,7 +788,7 @@ def _inputdevice_selected(self, checked):
)
self._update_input_device_footer()

def _inputconfig_selected(self, checked):
def _inputconfig_selected(self, checked: bool) -> None:
if not checked:
return

Expand All @@ -776,7 +797,7 @@ def _inputconfig_selected(self, checked):
self._joystick_reader.set_input_map(device.name, selected_mapping)
self._update_input_device_footer()

def device_discovery(self, devs):
def device_discovery(self, devs: list[InputReaderInterface]) -> None:
"""Called when new devices have been added"""
for menu in self._all_role_menus:
role_menu = menu["rolemenu"]
Expand Down
45 changes: 26 additions & 19 deletions src/cfclient/ui/pose_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
Sets up logging for the the full pose of the Crazyflie
"""

from __future__ import annotations

import asyncio
import logging
import math

from cflib2 import Crazyflie
from cflib2.error import DisconnectedError

from cfclient.gui import create_task
Expand Down Expand Up @@ -64,47 +67,48 @@ def __init__(self) -> None:
self._stream_task = None

@property
def position(self):
def position(self) -> tuple[float, ...]:
"""Get the position part of the full pose"""
return self.pose[0:3]

@property
def rpy(self):
def rpy(self) -> tuple[float, ...]:
"""Get the roll, pitch and yaw of the full pose in degrees"""
return self.pose[3:6]

@property
def rpy_rad(self):
def rpy_rad(self) -> list[float]:
"""Get the roll, pitch and yaw of the full pose in radians"""
return [
math.radians(self.pose[3]),
math.radians(self.pose[4]),
math.radians(self.pose[5]),
]

def start(self, cf):
def start(self, cf: Crazyflie) -> None:
"""Start streaming pose data from the Crazyflie."""
self._stream_task = create_task(self._stream_loop(cf))

def stop(self):
def stop(self) -> None:
"""Stop streaming pose data."""
if self._stream_task is not None:
self._stream_task.cancel()
self._stream_task = None
self.pose = self.NO_POSE

async def _stream_loop(self, cf):
async def _stream_loop(self, cf: Crazyflie) -> None:
log = cf.log()
block = await log.create_block()
await block.add_variable(self.LOG_NAME_ESTIMATE_X)
await block.add_variable(self.LOG_NAME_ESTIMATE_Y)
await block.add_variable(self.LOG_NAME_ESTIMATE_Z)
await block.add_variable(self.LOG_NAME_ESTIMATE_ROLL)
await block.add_variable(self.LOG_NAME_ESTIMATE_PITCH)
await block.add_variable(self.LOG_NAME_ESTIMATE_YAW)

stream = await block.start(40) # 40ms period
stream = None
try:
block = await log.create_block()
await block.add_variable(self.LOG_NAME_ESTIMATE_X)
await block.add_variable(self.LOG_NAME_ESTIMATE_Y)
await block.add_variable(self.LOG_NAME_ESTIMATE_Z)
await block.add_variable(self.LOG_NAME_ESTIMATE_ROLL)
await block.add_variable(self.LOG_NAME_ESTIMATE_PITCH)
await block.add_variable(self.LOG_NAME_ESTIMATE_YAW)

stream = await block.start(40) # 40ms period
while True:
data = await stream.next()
self.pose = (
Expand All @@ -116,8 +120,11 @@ async def _stream_loop(self, cf):
data.data[self.LOG_NAME_ESTIMATE_YAW],
)
self.data_received_cb.call(self, self.pose)
except DisconnectedError:
pass
Comment thread
ArisMorgens marked this conversation as resolved.
finally:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass
if stream is not None:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass
Loading
Loading