Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 146 additions & 102 deletions examples/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,145 +1,189 @@
""" Server metrics upload.
"""
# optimized_metrics_upload.py

# -*- coding: utf-8 -*-
"""
Server metrics upload via MQTT gateway to Aleph.im network.
Uses aiomqtt (Asyncio native) for clean, non-blocking operation.
"""

import asyncio
from typing import Dict
import json
from typing import Dict, Union, Any, List

import aiomqtt
import click
from httpx import HTTPStatusError

from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephHttpClient
from aleph.sdk.conf import settings

# Define the metric container structure
MetricsDict = Dict[str, Any]
# Consistent channel for metric aggregates
METRICS_CHANNEL = "SYSINFO"
# Message topic to subscribe to
MQTT_TOPIC = "/#"


def get_input_data(value):
def _decode_payload(value: bytes) -> Union[bool, float, str]:
"""
Decodes the MQTT payload (bytes) into the appropriate Python type (bool, float, or string).
"""
if value == b"true":
return True
elif value == b"false":
return False
try:
v = float(value)
return v
# Try to decode as a float (for numeric metrics)
return float(value)
except ValueError:
# Fallback to UTF-8 string (for labels or non-numeric metrics)
return value.decode("utf-8")


async def send_metrics(account, metrics):
async with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
return session.create_aggregate(
key="metrics", content=metrics, channel="SYSINFO"
)


def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected MQTT disconnection. Will auto-reconnect")

client.reconnect()


# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))

# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("/#")


# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
userdata["received"] = True
state = userdata["state"]
parts = msg.topic.strip("/").split("/")
curp = state
def _update_metrics_state(state: MetricsDict, topic: str, payload: bytes):
"""
Parses the MQTT topic and dynamically updates the nested metrics dictionary.
Example: topic 'host/cpu/usage' sets state['host']['cpu']['usage'].
"""
parts: List[str] = topic.strip("/").split("/")
curp: MetricsDict = state

# Iterate through topic parts to build nested structure, excluding the final key
for part in parts[:-1]:
if not isinstance(curp.get(part, None), dict):
if not isinstance(curp.get(part), dict):
curp[part] = {}
curp = curp[part]

# Set the final metric value
curp[parts[-1]] = _decode_payload(payload)
print(f"Received: {parts}, Value: {curp[parts[-1]]}")

curp[parts[-1]] = get_input_data(msg.payload)
print(parts, msg.payload)

async def _send_metrics(account: ETHAccount, metrics: MetricsDict):
"""
Authenticates and sends the collected metrics as a single aggregate message to Aleph.im.
"""
async with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
# Send all collected metrics under the single 'metrics' key
message, _ = await session.create_aggregate(
key="metrics", content=metrics, channel=METRICS_CHANNEL
)
print(f"Sent aggregate message: {message.item_hash}")


async def gateway(
loop,
host="api1.aleph.im",
port=1883,
ca_cert=None,
pkey=None,
keepalive=10,
transport="tcp",
auth=None,
host: str,
port: int,
ca_cert: Optional[str] = None,
pkey: Optional[str] = None,
auth: Optional[Dict[str, str]] = None,
send_interval: int = 10,
):
"""
Connects to the MQTT broker, listens for metrics, and periodically uploads them to Aleph.im.
"""
# 1. Initialize Aleph Account
if pkey is None:
pkey = get_fallback_private_key()

account = ETHAccount(private_key=pkey)
state: Dict = dict()
userdata = {"account": account, "state": state, "received": False}
client = aiomqtt.Client(loop, userdata=userdata, transport=transport)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message

if ca_cert is not None:
client.tls_set(ca_cert)
if auth is not None:
client.username_pw_set(**auth)

asyncio.ensure_future(client.loop_forever())

await client.connect(host, port, keepalive)

# Use a single dictionary to hold all collected metrics
state: MetricsDict = dict()

# Determine the transport type based on TLS usage
transport = "websockets" if port == 443 else "tcp"

# 2. Connect to MQTT Broker and process messages
try:
# Use aiomqtt's async context manager for robust connection management and auto-reconnect
async with aiomqtt.Client(hostname=host, port=port, transport=transport, **(auth or {})) as client:

# Subscribe to all topics immediately after connection
await client.subscribe(MQTT_TOPIC)
print(f"Connected to MQTT broker {host}:{port}. Subscribed to {MQTT_TOPIC}")

# Use a separate task for sending metrics to run alongside message listening
send_task = asyncio.create_task(
_periodic_sender(account, state, send_interval)
)

# Process incoming messages asynchronously
async for message in client.messages:
# Update the shared metrics state upon receiving a message
_update_metrics_state(state, message.topic.value, message.payload)

except aiomqtt.MqttError as e:
print(f"MQTT Error: {e}. Retrying connection in 5 seconds.")
# Introduce a delay before allowing the main loop to attempt reconnection
await asyncio.sleep(5)
except HTTPStatusError as e:
print(f"Aleph API Error during upload: {e}")
# Continue listening, hoping the API recovers
finally:
if 'send_task' in locals() and not send_task.done():
send_task.cancel()


async def _periodic_sender(account: ETHAccount, state: MetricsDict, interval: int):
"""
Periodically checks the state and uploads metrics to Aleph.im.
"""
while True:
await asyncio.sleep(10)
if not userdata["received"]:
await client.reconnect()

async with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
for key, value in state.items():
message, status = await session.create_aggregate(
key=key, content=value, channel="IOT_TEST"
)
print("sent", message.item_hash)
userdata["received"] = False
try:
# Wait for the next interval
await asyncio.sleep(interval)

# Only upload if some metrics have been collected
if state:
await _send_metrics(account, state)
else:
print("Waiting for initial metrics...")

except asyncio.CancelledError:
# Graceful exit upon task cancellation
print("Sender task cancelled.")
break
except Exception as e:
# Catch all exceptions to keep the sender running
print(f"An unexpected error occurred in sender task: {e}")


@click.command()
@click.option("--host", default="localhost", help="MQTT Broker host")
@click.option("--port", default=1883, help="MQTT Broker port")
@click.option("--host", default="api1.aleph.im", help="MQTT Broker host")
@click.option("--port", default=8883, type=int, help="MQTT Broker port (Default 8883 for TLS/Websockets)")
@click.option("--user", default=None, help="MQTT Auth username")
@click.option("--password", default=None, help="MQTT Auth password")
@click.option("--use-tls", is_flag=True)
@click.option("--ca-cert", default=None, help="CA Cert path")
@click.option(
"--pkey",
default=None,
help="Account private key (optionnal, will default to device.key file)",
)
def main(host, port, user, password, use_tls=False, ca_cert=None, pkey=None):
loop = asyncio.get_event_loop()
auth = None
if user is not None:
auth = {"username": user, "password": password}

if use_tls:
if ca_cert is None:
import certifi

ca_cert = certifi.where()
print(ca_cert)

loop.run_until_complete(
gateway(loop, host, port, ca_cert=ca_cert, pkey=pkey, auth=auth)
)
@click.option("--use-tls", is_flag=True, help="Use TLS/SSL for connection")
@click.option("--ca-cert", default=None, help="Path to CA Cert file")
@click.option("--pkey", default=None, help="Account private key")
@click.option("--interval", default=10, type=int, help="Interval in seconds for metric uploads")
def main(host, port, user, password, use_tls=False, ca_cert=None, pkey=None, interval=10):
"""
Starts the MQTT-to-Aleph.im metric gateway.
"""
# 1. Setup Auth
auth = {"username": user, "password": password} if user is not None else None

# 2. Handle TLS/Cert setup
if use_tls and ca_cert is None:
import certifi
ca_cert = certifi.where()
print(f"Using default CA bundle from certifi: {ca_cert}")

# 3. Run the async gateway
try:
asyncio.run(
gateway(host, port, ca_cert=ca_cert, pkey=pkey, auth=auth, send_interval=interval)
)
except KeyboardInterrupt:
print("\nGateway stopped by user.")
except Exception as e:
print(f"Fatal error in main execution: {e}")


if __name__ == "__main__":
Expand Down