diff --git a/cflib/utils/supervisor_state.py b/cflib/utils/supervisor_state.py new file mode 100644 index 000000000..3107e3950 --- /dev/null +++ b/cflib/utils/supervisor_state.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +# +# ,---------, ____ _ __ +# | ,-^-, | / __ )(_) /_______________ _____ ___ +# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2025 Bitcraze AB +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, in version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +This class is used to easily get the state of your Crazyflie by through +the supervisor. Check the `reading_supervisor.py` example in +examples/supervisor/ to better understand how it works. +""" +import threading + +from cflib.crazyflie.log import LogConfig +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie + + +class SupervisorState: + STATES = [ + 'Can be armed', + 'Is armed', + 'Is auto armed', + 'Can fly', + 'Is flying', + 'Is tumbled', + 'Is locked', + 'Is crashed', + 'HL control is active', + 'Finished HL trajectory', + 'HL control is disabled' + ] + + def __init__(self, crazyflie): + if isinstance(crazyflie, SyncCrazyflie): + self.cf = crazyflie.cf + else: + self.cf = crazyflie + + self.logconf = LogConfig(name='SupervisorInfo', period_in_ms=100) + self.logconf.add_variable('supervisor.info', 'uint16_t') + self.cf.log.add_config(self.logconf) + + def close(self): + try: + self.logconf.delete() + except Exception as e: + print(f'Warning: failed to delete logconf: {e}') + + def read_supervisor_state_bitfield(self): + """ + Reads 'supervisor.info' once from the Crazyflie. + Returns the value or None if timed out. + """ + value_holder = {'val': None} + event = threading.Event() + + def log_callback(timestamp, data, logconf): + value_holder['val'] = data['supervisor.info'] + event.set() + + def log_error(logconf, msg): + print(f'Error when logging {logconf.name}: {msg}') + event.set() + + self.logconf.data_received_cb.add_callback(log_callback) + self.logconf.error_cb.add_callback(log_error) + self.logconf.start() + + if event.wait(2.0): + bitfield = value_holder['val'] + else: + print('Timeout waiting for supervisor.info') + bitfield = None + + self.logconf.stop() + self.logconf.data_received_cb.remove_callback(log_callback) + self.logconf.error_cb.remove_callback(log_error) + + return bitfield + + def read_supervisor_state_list(self): + """ + Reads 'supervisor.info' once from the Crazyflie. + Returns the list of all active states. + """ + bitfield = self.read_supervisor_state_bitfield() + list = self.decode_bitfield(bitfield) + return list + + def decode_bitfield(self, value): + """ + Given a bitfield integer `value` and a list of `self.STATES`, + returns the names of all states whose bits are set. + Bit 0 corresponds to states[0], Bit 1 to states[1], etc. + + * Bit 0 = Can be armed - the system can be armed and will accept an arming command. + * Bit 1 = Is armed - the system is armed. + * Bit 2 = Is auto armed - the system is configured to automatically arm. + * Bit 3 = Can fly - the Crazyflie is ready to fly. + * Bit 4 = Is flying - the Crazyflie is flying. + * Bit 5 = Is tumbled - the Crazyflie is up side down. + * Bit 6 = Is locked - the Crazyflie is in the locked state and must be restarted. + * Bit 7 = Is crashed - the Crazyflie has crashed. + * Bit 8 = High level control is actively flying the drone. + * Bit 9 = High level trajectory has finished. + * Bit 10 = High level control is disabled and not producing setpoints. + """ + if value < 0: + raise ValueError('value must be >= 0') + + result = [] + for bit_index, name in enumerate(self.STATES): + if value & (1 << bit_index): + result.append(name) + return result + + # Individual state checks + def can_be_armed(self): + # Bit 0 + return bool(self.read_supervisor_state_bitfield() & (1 << 0)) + + def is_armed(self): + # Bit 1 + return bool(self.read_supervisor_state_bitfield() & (1 << 1)) + + def is_auto_armed(self): + # Bit 2 + return bool(self.read_supervisor_state_bitfield() & (1 << 2)) + + def can_fly(self): + # Bit 3 + return bool(self.read_supervisor_state_bitfield() & (1 << 3)) + + def is_flying(self): + # Bit 4 + return bool(self.read_supervisor_state_bitfield() & (1 << 4)) + + def is_tumbled(self): + # Bit 5 + return bool(self.read_supervisor_state_bitfield() & (1 << 5)) + + def is_locked(self): + # Bit 6 + return bool(self.read_supervisor_state_bitfield() & (1 << 6)) + + def is_crashed(self): + # Bit 7 + return bool(self.read_supervisor_state_bitfield() & (1 << 7)) + + def active_hl_control(self): + # Bit 8 + return bool(self.read_supervisor_state_bitfield() & (1 << 8)) + + def finished_hl_traj(self): + # Bit 9 + return bool(self.read_supervisor_state_bitfield() & (1 << 9)) + + def disabled_hl_control(self): + # Bit 10 + return bool(self.read_supervisor_state_bitfield() & (1 << 10)) diff --git a/examples/supervisor/reading_supervisor.py b/examples/supervisor/reading_supervisor.py new file mode 100644 index 000000000..6c1399cd1 --- /dev/null +++ b/examples/supervisor/reading_supervisor.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2025 Bitcraze AB +# +# Crazyflie Nano Quadcopter Client +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Simple example of reading the state of the Crazyflie through the supervisor. + +Based on its state, the Crazyflie will arm (if it can be armed), take off +(if it can fly), and land (if it is flying). After each action, we call +the supervisor to check if the Crazyflie is crashed, locked or tumbled. +""" +import time + +import cflib.crtp +from cflib.crazyflie import Crazyflie +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.utils import uri_helper +from cflib.utils.reset_estimator import reset_estimator +from cflib.utils.supervisor_state import SupervisorState + + +# URI to the Crazyflie to connect to +uri = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7') + + +def safety_check(sup: SupervisorState): + if sup.is_crashed(): + raise Exception('Crazyflie crashed!') + if sup.is_locked(): + raise Exception('Crazyflie locked!') + if sup.is_tumbled(): + raise Exception('Crazyflie tumbled!') + + +def run_sequence(scf: SyncCrazyflie, sup: SupervisorState): + commander = scf.cf.high_level_commander + + try: + if sup.can_be_armed(): + print('The Crazyflie can be armed...arming!') + safety_check(sup) + scf.cf.platform.send_arming_request(True) + time.sleep(1) + + if sup.can_fly(): + print('The Crazyflie can fly...taking off!') + commander.takeoff(1.0, 2.0) + time.sleep(3) + safety_check(sup) + if sup.is_flying(): + print('The Crazyflie is flying...landing!') + commander.land(0.0, 2.0) + time.sleep(3) + safety_check(sup) + + except Exception as e: + print(e) + + +if __name__ == '__main__': + cflib.crtp.init_drivers() + + with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: + time.sleep(1) + supervisor = SupervisorState(scf) + reset_estimator(scf) + time.sleep(1) + run_sequence(scf, supervisor)