-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSerial_Drone.py
110 lines (83 loc) · 3.43 KB
/
Serial_Drone.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
import serial
import threading
from MAVLink import *
class DeviceSerial(object):
def __init__(self, serial_device, baud_rate):
self.dev = serial.Serial(serial_device, baud_rate, timeout=None)
def write(self, buf):
send_callback = self.dev.write(buf)
def read(self, size):
data = self.dev.read(size)
return data
def close(self):
try:
self.dev.close()
except Exception as e:
print(e)
class MAVLinkAPI(object):
def __init__(self, serial_device, baud_rate):
self.system_id = 254
self.component_id = 1
self.target_system_id = 1
self.target_component_id = 1
self.dev = DeviceSerial(serial_device, baud_rate)
self.mavlink = MAVLink(self.dev, self.system_id, self.component_id)
# 各类线程实例
# daemon: 守护进程
# target: 回调函数
self.th_msg_receive = threading.Thread(daemon=True, target=self.th_receive_fun)
self.th_send_heart = threading.Thread(daemon=True, target=self.th_send_heart_fun)
# 开启线程
self.th_msg_receive.start()
self.th_send_heart.start()
def th_receive_fun(self):
count = 0
while True:
data = self.dev.read(256)
if len(data) > 0:
try:
# 解析字节数组
received_msg = self.mavlink.parse_char(data)
except Exception as e:
continue
if isinstance(received_msg, MAVLink_message):
print("INFO: Got a mavlink message")
if isinstance(received_msg, MAVLink_heartbeat_message):
print("INFO: sys={}, cmp={}".format(received_msg.get_srcSystem(), received_msg.get_srcComponent()))
print("INFO: type={}, autopilot={}".format(received_msg.type, received_msg.autopilot))
if isinstance(received_msg, MAVLink_attitude_message):
print("INFO: roll={:.3f}, pitch={:.3f}, yaw={:.3f}".format(received_msg.roll, received_msg.pitch, received_msg.yaw))
def th_send_heart_fun(self):
while True:
msg = MAVLink_heartbeat_message(
MAV_TYPE_QUADROTOR, MAV_AUTOPILOT_PX4, MAV_MODE_PREFLIGHT,
0x60000, MAV_STATE_STANDBY, 2
)
# 发送消息,调用了 DeviceUDP.write
self.mavlink.send(msg)
time.sleep(1)
def arm_disarm(self, arm):
arm = 1 if arm else 0
msg_cmd = MAVLink_command_long_message(self.target_system_id, self.target_component_id, MAV_CMD_COMPONENT_ARM_DISARM,
0, arm, 0, 0, 0, 0, 0, 0)
self.mavlink.send(msg_cmd)
def takeoff_land(self, cmd="takeoff"):
base_mode = 129
custom_mode = 0x2040000
if cmd == 'land':
custom_mode = 0x6040000
msg_cmd = MAVLink_set_mode_message(self.target_system_id, base_mode=base_mode, custom_mode=custom_mode)
self.mavlink.send(msg_cmd)
def test_px4_fcu():
serial_device = '/dev/ttyACM0'
baud_rate = 115200
mav_link_api = MAVLinkAPI(serial_device, baud_rate)
time.sleep(3)
mav_link_api.arm_disarm(True)
time.sleep(2)
mav_link_api.takeoff_land('takeoff')
# time.sleep(5)
# mav_link_api.takeoff_land('land')
if __name__ == '__main__':
test_px4_fcu()