Skip to content

Commit 243631f

Browse files
committed
examples: mqtt: add rl6 sim
1 parent ec67428 commit 243631f

File tree

1 file changed

+181
-0
lines changed

1 file changed

+181
-0
lines changed

examples/mqtt/rl6_sim.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import asyncio
2+
import json
3+
import os
4+
import time
5+
from typing import Any, Dict
6+
7+
import enapter
8+
9+
10+
async def main() -> None:
11+
hardware_id = os.environ["HARDWARE_ID"]
12+
channel_id = os.environ["CHANNEL_ID"]
13+
mqtt_config = enapter.mqtt.Config(
14+
host=os.environ["MQTT_HOST"],
15+
port=int(os.environ["MQTT_PORT"]),
16+
tls=enapter.mqtt.TLSConfig(
17+
secret_key=os.environ["MQTT_TLS_SECRET_KEY"],
18+
cert=os.environ["MQTT_TLS_CERT"],
19+
ca_cert=os.environ["MQTT_TLS_CA_CERT"],
20+
),
21+
)
22+
async with enapter.mqtt.Client(config=mqtt_config) as client:
23+
async with asyncio.TaskGroup() as tg:
24+
tg.create_task(command_handler(client, hardware_id, channel_id))
25+
tg.create_task(telemetry_publisher(client, hardware_id, channel_id))
26+
tg.create_task(properties_publisher(client, hardware_id, channel_id))
27+
# NOTE: The following two tasks are necessary only when connecting
28+
# to Cloud v2.
29+
tg.create_task(ucm_properties_publisher(client, hardware_id))
30+
tg.create_task(ucm_telemetry_publisher(client, hardware_id))
31+
32+
33+
async def command_handler(
34+
client: enapter.mqtt.Client, hardware_id: str, channel_id: str
35+
) -> None:
36+
async with client.subscribe(
37+
f"v1/to/{hardware_id}/{channel_id}/v1/command/requests"
38+
) as messages:
39+
async for msg in messages:
40+
request = json.loads(msg.payload)
41+
match request["name"]:
42+
case "enable_load":
43+
response = handle_enable_load_command(request)
44+
case "disable_load":
45+
response = handle_disable_load_command(request)
46+
case _:
47+
response = handle_unknown_command(request)
48+
try:
49+
await client.publish(
50+
topic=f"v1/from/{hardware_id}/{channel_id}/v1/command/responses",
51+
payload=json.dumps(response),
52+
)
53+
except enapter.mqtt.Error as e:
54+
print("failed to publish command response: " + str(e))
55+
56+
57+
LOADS = {
58+
"r1": False,
59+
"r2": False,
60+
"r3": False,
61+
"r4": False,
62+
"r5": False,
63+
"r6": False,
64+
}
65+
66+
67+
def handle_enable_load_command(request: Dict[str, Any]) -> Dict[str, Any]:
68+
arguments = request.get("arguments", {})
69+
load = arguments.get("load")
70+
if load not in LOADS:
71+
return {
72+
"id": request["id"],
73+
"state": "error",
74+
"payload": {"reason": "load invalid or missing"},
75+
}
76+
LOADS[load] = True
77+
return {
78+
"id": request["id"],
79+
"state": "completed",
80+
"payload": {},
81+
}
82+
83+
84+
def handle_disable_load_command(request: Dict[str, Any]) -> Dict[str, Any]:
85+
args = request.get("args", {})
86+
load = args.get("load")
87+
if load not in LOADS:
88+
return {
89+
"id": request["id"],
90+
"state": "error",
91+
"payload": {"reason": "load invalid or missing"},
92+
}
93+
LOADS[load] = False
94+
return {
95+
"id": request["id"],
96+
"state": "completed",
97+
"payload": {},
98+
}
99+
100+
101+
def handle_unknown_command(request: Dict[str, Any]) -> Dict[str, Any]:
102+
return {
103+
"id": request["id"],
104+
"state": "error",
105+
"payload": {"reason": "command unknown"},
106+
}
107+
108+
109+
async def telemetry_publisher(
110+
client: enapter.mqtt.Client, hardware_id: str, channel_id: str
111+
) -> None:
112+
while True:
113+
try:
114+
telemetry = {
115+
"timestamp": int(time.time()),
116+
**LOADS,
117+
}
118+
await client.publish(
119+
topic=f"v1/from/{hardware_id}/{channel_id}/v1/telemetry",
120+
payload=json.dumps(telemetry),
121+
)
122+
except enapter.mqtt.Error as e:
123+
print("failed to publish telemetry: " + str(e))
124+
await asyncio.sleep(1)
125+
126+
127+
async def properties_publisher(
128+
client: enapter.mqtt.Client, hardware_id: str, channel_id: str
129+
) -> None:
130+
while True:
131+
try:
132+
properties = {
133+
"timestamp": int(time.time()),
134+
}
135+
await client.publish(
136+
topic=f"v1/from/{hardware_id}/{channel_id}/v1/properties",
137+
payload=json.dumps(properties),
138+
)
139+
except enapter.mqtt.Error as e:
140+
print("failed to publish properties: " + str(e))
141+
await asyncio.sleep(10)
142+
143+
144+
async def ucm_telemetry_publisher(
145+
client: enapter.mqtt.Client, hardware_id: str
146+
) -> None:
147+
while True:
148+
try:
149+
telemetry = {
150+
"timestamp": int(time.time()),
151+
}
152+
await client.publish(
153+
topic=f"v1/from/{hardware_id}/ucm/v1/telemetry",
154+
payload=json.dumps(telemetry),
155+
)
156+
except enapter.mqtt.Error as e:
157+
print("failed to publish ucm telemetry: " + str(e))
158+
await asyncio.sleep(1)
159+
160+
161+
async def ucm_properties_publisher(
162+
client: enapter.mqtt.Client, hardware_id: str
163+
) -> None:
164+
while True:
165+
try:
166+
properties = {
167+
"timestamp": int(time.time()),
168+
"virtual": True,
169+
"lua_api_ver": 1,
170+
}
171+
await client.publish(
172+
topic=f"v1/from/{hardware_id}/ucm/v1/register",
173+
payload=json.dumps(properties),
174+
)
175+
except enapter.mqtt.Error as e:
176+
print("failed to publish ucm properties: " + str(e))
177+
await asyncio.sleep(10)
178+
179+
180+
if __name__ == "__main__":
181+
asyncio.run(main())

0 commit comments

Comments
 (0)