Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from bamboo.keys import KeyRegistry
from .zmq_transport import ZmqTransport
from .libby import Libby
from .mqtt_rpc_client import MqttRpcClient

__all__ = ["Libby", "ZmqTransport", "Protocol", "MessageBuilder", "KeyRegistry"]
__all__ = ["Libby", "ZmqTransport", "Protocol", "MessageBuilder", "KeyRegistry", "MqttRpcClient"]
82 changes: 82 additions & 0 deletions libby/mqtt_dummy_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
import time
import paho.mqtt.client as mqtt

MQTT_HOST = "localhost"
MQTT_PORT = 1883
BASE = "mktl/dev-001"
REQ_TOPIC = f"{BASE}/req"
RESP_TOPIC = f"{BASE}/resp"
TELEMETRY_TOPIC = f"{BASE}/telemetry"

state = {
"switch": False,
"last_ts": time.time(),
}

def make_resp(msg_id: str, ok: bool, payload: dict | None = None, error: str | None = None) -> dict:
return {
"type": "response",
"ok": ok,
"payload": payload or {},
"error": error,
"msg_id": msg_id,
"ts": time.time(),
}

def on_connect(client, userdata, flags, rc):
client.subscribe(REQ_TOPIC, qos=1)
print("[dummy] online, listening", REQ_TOPIC)

def on_message(client, userdata, msg):
try:
req = json.loads(msg.payload.decode("utf-8"))
except Exception:
return

if msg.topic != REQ_TOPIC:
return

action = req.get("action", "")
payload = req.get("payload") or {}
msg_id = req.get("msg_id", "")

if action in ("device.status.get", "device.switch.get"):
resp = make_resp(msg_id, True, {"switch": state["switch"], "ts": time.time()})
elif action == "device.switch.set":
desired = payload.get("on", None)
if isinstance(desired, bool):
state["switch"] = desired
state["last_ts"] = time.time()
print(f"[dummy] switch -> {state['switch']}")
resp = make_resp(msg_id, True, {"switch": state["switch"], "ts": state["last_ts"]})
else:
resp = make_resp(msg_id, False, error="payload.on must be boolean")
else:
resp = make_resp(msg_id, False, error=f"unknown action '{action}'")

client.publish(RESP_TOPIC, json.dumps(resp), qos=1, retain=False)

def main():
c = mqtt.Client(client_id="dummy-dev-001")
c.on_connect = on_connect
c.on_message = on_message
c.connect(MQTT_HOST, MQTT_PORT, keepalive=30)

# periodic status
def publish_telemetry():
t = {"switch": state["switch"], "uptime_s": int(time.time() - state["last_ts"]), "ts": time.time()}
c.publish(TELEMETRY_TOPIC, json.dumps(t), qos=0, retain=False)

c.loop_start()
try:
while True:
publish_telemetry()
time.sleep(5)
except KeyboardInterrupt:
pass
finally:
c.loop_stop()

if __name__ == "__main__":
main()
63 changes: 63 additions & 0 deletions libby/mqtt_rpc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json, time, uuid, threading
import paho.mqtt.client as mqtt


BASE = "mktl/dev-001"
REQ_TOPIC = f"{BASE}/req"
RESP_TOPIC = f"{BASE}/resp"
TELEMETRY_TOPIC = f"{BASE}/telemetry"
VERBOSE = True

class MqttRpcClient:
def __init__(self, client_id="libby-bridge-mqtt", host="localhost", port=1883):
self._c = mqtt.Client(client_id=client_id)
self._c.on_connect = self._on_connect
self._c.on_message = self._on_message
self._host, self._port = host, port
self._waiters = {}
self._lock = threading.Lock()
self._connected = threading.Event()

def start(self):
self._c.connect(self._host, self._port, keepalive=30)
self._c.loop_start()
self._connected.wait(5)

def _on_connect(self, client, userdata, flags, rc, properties=None):
client.subscribe(RESP_TOPIC, qos=1)
client.subscribe(TELEMETRY_TOPIC, qos=0)
if VERBOSE:
print(f"[bridge] MQTT connected; sub {RESP_TOPIC}, {TELEMETRY_TOPIC}")
self._connected.set()

def _on_message(self, client, userdata, msg):
try:
data = json.loads(msg.payload.decode("utf-8"))
except Exception:
if VERBOSE: print(f"[bridge] MQTT ← {msg.topic}: <non-json>")
return
if VERBOSE: print(f"[bridge] MQTT ← {msg.topic}: {data}")
if msg.topic == RESP_TOPIC:
msg_id = data.get("msg_id")
if not msg_id: return
with self._lock:
w = self._waiters.get(msg_id)
if w:
w["resp"] = data
w["ev"].set()

def call(self, action: str, payload=None, timeout=5.0) -> dict:
msg_id = str(uuid.uuid4())
req = {"type": "request", "action": action, "payload": payload or {}, "msg_id": msg_id, "ts": time.time()}
if VERBOSE: print(f"[bridge] MQTT → {REQ_TOPIC}: {req}")
ev = threading.Event()
with self._lock:
self._waiters[msg_id] = {"ev": ev, "resp": None}
self._c.publish(REQ_TOPIC, json.dumps(req), qos=1)
if not ev.wait(timeout):
with self._lock:
self._waiters.pop(msg_id, None)
raise TimeoutError(f"MQTT RPC timeout for '{action}'")
with self._lock:
resp = self._waiters.pop(msg_id)["resp"]
return resp or {"ok": False, "error": "empty response"}
14 changes: 0 additions & 14 deletions package/module.py

This file was deleted.

63 changes: 63 additions & 0 deletions peers/peer_mqtt_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import time
from libby import Libby
from libby import MqttRpcClient

PEER_ID = "peer-bridge"
BIND = "tcp://*:5557"
ADDRESS_BOOK = {}
EXPOSED_KEYS = ["device.status.get", "device.switch.get", "device.switch.set"]
VERBOSE = True


def make_libby_handler(mqtt_rpc: MqttRpcClient):
def handle(payload: dict, meta: dict) -> dict:
key = meta.get("key", "")
ttl_ms = meta.get("ttl_ms", 5000)

if VERBOSE:
print(f"[bridge] Libby ← key={key} payload={payload} meta={{src:{meta.get('src')}, dst:{meta.get('dst')}, ttl_ms:{ttl_ms}}}")

if key not in EXPOSED_KEYS:
return {"error": f"unknown key '{key}'"}

try:
resp = mqtt_rpc.call(key, payload or {}, timeout=ttl_ms / 1000.0)
except TimeoutError as e:
if VERBOSE:
print(f"[bridge] Libby → timeout for {key}: {e}")
return {"error": str(e)}

if not isinstance(resp, dict):
return {"error": "malformed device response"}

if not resp.get("ok", False):
err = resp.get("error", "device error")
if VERBOSE:
print(f"[bridge] Libby → device error for {key}: {err}")
return {"error": err}

out = resp.get("payload", {})
if VERBOSE:
print(f"[bridge] Libby → payload for {key}: {out}")
return out
return handle

def main():
mqtt_rpc = MqttRpcClient()
mqtt_rpc.start()
with Libby.zmq(
self_id=PEER_ID,
bind=BIND,
address_book=ADDRESS_BOOK,
keys=EXPOSED_KEYS, # advertise keys the bridge serves
callback=make_libby_handler(mqtt_rpc),
discover=True,
discover_interval_s=1.5,
hello_on_start=True,
) as libby:
print(f"[{PEER_ID}] Libby↔MQTT bridge online. Serving: {', '.join(EXPOSED_KEYS)}")
while True:
time.sleep(1)

if __name__ == "__main__":
main()
42 changes: 42 additions & 0 deletions peers/peer_query_the_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from libby import Libby

PEER_ID = "peer-controller"
BIND = "tcp://*:5558"
ADDRESS_BOOK = {
"peer-bridge": "tcp://127.0.0.1:5557",
}
BRIDGE = "peer-bridge"

def main():
with Libby.zmq(
self_id=PEER_ID,
bind=BIND,
address_book=ADDRESS_BOOK,
discover=True,
discover_interval_s=1.5,
hello_on_start=True,
) as libby:
libby.learn_peer_keys(BRIDGE, [
"device.status.get",
"device.switch.get",
"device.switch.set",
])

print("[ctl] → device.status.get")
r = libby.request(BRIDGE, key="device.status.get", payload={}, ttl_ms=8000)
print("[ctl] ←", r)

print("[ctl] → device.switch.set(on=True)")
r = libby.request(BRIDGE, key="device.switch.set", payload={"on": True}, ttl_ms=8000)
print("[ctl] ←", r)

print("[ctl] → device.switch.get")
r = libby.request(BRIDGE, key="device.switch.get", payload={}, ttl_ms=8000)
print("[ctl] ←", r)

print("[ctl] → device.switch.set(on=False)")
r = libby.request(BRIDGE, key="device.switch.set", payload={"on": False}, ttl_ms=8000)
print("[ctl] ←", r)

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ authors = [
]
dependencies = [
"pyzmq>=25.0.0",
"paho-mqtt>=2.1.0",
"bamboo @ git+https://github.com/CaltechOpticalObservatories/bamboo.git@main"
]

Expand Down