-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqttproxy.py
92 lines (76 loc) · 2.89 KB
/
mqttproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
# -*- coding: utf8 -*-
import socket
import select
import socketserver
from collections import deque
import mqtt_protocol as mqtt
class MQTTConnectionHandler:
"""
Manage two side connections between client and broker
"""
def __init__(self, client, broker):
self.client = client
self.broker = broker
self.bqueue = deque()
self.cqueue = deque()
def run(self):
fds = [self.client, self.broker]
queues = [self.cqueue, self.bqueue]
readers = fds[:]
while True:
writers = [fd for fd, q in zip(fds, queues) if q]
if not readers and not writers:
break
rlist, wlist, _ = select.select(readers, writers, [])
if self.client in rlist:
data = mqtt.read_paquet(self.client)
if data is not None:
self.bqueue.append(data)
else:
readers.remove(self.client)
#self.broker.sendall(data)
if self.broker in rlist:
data = mqtt.read_paquet(self.broker)
if data is not None:
self.cqueue.append(data)
else:
readers.remove(self.broker)
#self.client.sendall(data)
if self.client in wlist:
data = self.cqueue.popleft()
self.client.sendall(data)
if self.broker in wlist:
data = self.bqueue.popleft()
self.broker.sendall(data)
class MQTTProxyHandler(socketserver.BaseRequestHandler):
def handle(self):
print('handle connection from', self.client_address)
# connect socket to broker
sock = socket.create_connection(self.server.broker_address)
handler = MQTTConnectionHandler(self.request, sock)
handler.run()
sock.shutdown(socket.SHUT_RDWR)
sock.close()
def finish(self):
print('closing connection from', self.client_address)
#self.request.shutdown(socket.SHUT_RDWR)
#self.request.close()
class MQTTProxyServer(socketserver.ThreadingTCPServer):
def __init__(self, local_address, broker_address):
super().__init__(local_address, MQTTProxyHandler)
self.broker_address = broker_address
# python3 mqttproxy.py -broker broker.hivemq.com:1883
# 'broker.hivemq.com' 'iot.eclipse.org', 'test.mosquitto.org'
if __name__ == '__main__':
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-broker', required=True, metavar='HOST:PORT')
parser.add_argument('-host', default='0.0.0.0')
parser.add_argument('-port', type=int, default=1883)
args = parser.parse_args(sys.argv[1:])
host, port = args.broker.split(':')
broker_address = host, int(port)
proxy = MQTTProxyServer((args.host, args.port), broker_address)
proxy.serve_forever()