diff --git a/examples/mqtt.py b/examples/mqtt.py index db5c20c1..6e395136 100644 --- a/examples/mqtt.py +++ b/examples/mqtt.py @@ -1,145 +1,189 @@ -""" Server metrics upload. -""" +# optimized_metrics_upload.py -# -*- coding: utf-8 -*- +""" +Server metrics upload via MQTT gateway to Aleph.im network. +Uses aiomqtt (Asyncio native) for clean, non-blocking operation. +""" import asyncio -from typing import Dict +import json +from typing import Dict, Union, Any, List import aiomqtt import click +from httpx import HTTPStatusError from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.conf import settings +# Define the metric container structure +MetricsDict = Dict[str, Any] +# Consistent channel for metric aggregates +METRICS_CHANNEL = "SYSINFO" +# Message topic to subscribe to +MQTT_TOPIC = "/#" + -def get_input_data(value): +def _decode_payload(value: bytes) -> Union[bool, float, str]: + """ + Decodes the MQTT payload (bytes) into the appropriate Python type (bool, float, or string). + """ if value == b"true": return True elif value == b"false": return False try: - v = float(value) - return v + # Try to decode as a float (for numeric metrics) + return float(value) except ValueError: + # Fallback to UTF-8 string (for labels or non-numeric metrics) return value.decode("utf-8") -async def send_metrics(account, metrics): - async with AuthenticatedAlephHttpClient( - account=account, api_server=settings.API_HOST - ) as session: - return session.create_aggregate( - key="metrics", content=metrics, channel="SYSINFO" - ) - - -def on_disconnect(client, userdata, rc): - if rc != 0: - print("Unexpected MQTT disconnection. Will auto-reconnect") - - client.reconnect() - - -# The callback for when the client receives a CONNACK response from the server. -def on_connect(client, userdata, flags, rc): - print("Connected with result code " + str(rc)) - - # Subscribing in on_connect() means that if we lose the connection and - # reconnect then subscriptions will be renewed. - client.subscribe("/#") - - -# The callback for when a PUBLISH message is received from the server. -def on_message(client, userdata, msg): - userdata["received"] = True - state = userdata["state"] - parts = msg.topic.strip("/").split("/") - curp = state +def _update_metrics_state(state: MetricsDict, topic: str, payload: bytes): + """ + Parses the MQTT topic and dynamically updates the nested metrics dictionary. + Example: topic 'host/cpu/usage' sets state['host']['cpu']['usage']. + """ + parts: List[str] = topic.strip("/").split("/") + curp: MetricsDict = state + + # Iterate through topic parts to build nested structure, excluding the final key for part in parts[:-1]: - if not isinstance(curp.get(part, None), dict): + if not isinstance(curp.get(part), dict): curp[part] = {} curp = curp[part] + + # Set the final metric value + curp[parts[-1]] = _decode_payload(payload) + print(f"Received: {parts}, Value: {curp[parts[-1]]}") - curp[parts[-1]] = get_input_data(msg.payload) - print(parts, msg.payload) + +async def _send_metrics(account: ETHAccount, metrics: MetricsDict): + """ + Authenticates and sends the collected metrics as a single aggregate message to Aleph.im. + """ + async with AuthenticatedAlephHttpClient( + account=account, api_server=settings.API_HOST + ) as session: + # Send all collected metrics under the single 'metrics' key + message, _ = await session.create_aggregate( + key="metrics", content=metrics, channel=METRICS_CHANNEL + ) + print(f"Sent aggregate message: {message.item_hash}") async def gateway( - loop, - host="api1.aleph.im", - port=1883, - ca_cert=None, - pkey=None, - keepalive=10, - transport="tcp", - auth=None, + host: str, + port: int, + ca_cert: Optional[str] = None, + pkey: Optional[str] = None, + auth: Optional[Dict[str, str]] = None, + send_interval: int = 10, ): + """ + Connects to the MQTT broker, listens for metrics, and periodically uploads them to Aleph.im. + """ + # 1. Initialize Aleph Account if pkey is None: pkey = get_fallback_private_key() account = ETHAccount(private_key=pkey) - state: Dict = dict() - userdata = {"account": account, "state": state, "received": False} - client = aiomqtt.Client(loop, userdata=userdata, transport=transport) - client.on_connect = on_connect - client.on_disconnect = on_disconnect - client.on_message = on_message - - if ca_cert is not None: - client.tls_set(ca_cert) - if auth is not None: - client.username_pw_set(**auth) - - asyncio.ensure_future(client.loop_forever()) - - await client.connect(host, port, keepalive) + + # Use a single dictionary to hold all collected metrics + state: MetricsDict = dict() + + # Determine the transport type based on TLS usage + transport = "websockets" if port == 443 else "tcp" + + # 2. Connect to MQTT Broker and process messages + try: + # Use aiomqtt's async context manager for robust connection management and auto-reconnect + async with aiomqtt.Client(hostname=host, port=port, transport=transport, **(auth or {})) as client: + + # Subscribe to all topics immediately after connection + await client.subscribe(MQTT_TOPIC) + print(f"Connected to MQTT broker {host}:{port}. Subscribed to {MQTT_TOPIC}") + + # Use a separate task for sending metrics to run alongside message listening + send_task = asyncio.create_task( + _periodic_sender(account, state, send_interval) + ) + + # Process incoming messages asynchronously + async for message in client.messages: + # Update the shared metrics state upon receiving a message + _update_metrics_state(state, message.topic.value, message.payload) + + except aiomqtt.MqttError as e: + print(f"MQTT Error: {e}. Retrying connection in 5 seconds.") + # Introduce a delay before allowing the main loop to attempt reconnection + await asyncio.sleep(5) + except HTTPStatusError as e: + print(f"Aleph API Error during upload: {e}") + # Continue listening, hoping the API recovers + finally: + if 'send_task' in locals() and not send_task.done(): + send_task.cancel() + + +async def _periodic_sender(account: ETHAccount, state: MetricsDict, interval: int): + """ + Periodically checks the state and uploads metrics to Aleph.im. + """ while True: - await asyncio.sleep(10) - if not userdata["received"]: - await client.reconnect() - - async with AuthenticatedAlephHttpClient( - account=account, api_server=settings.API_HOST - ) as session: - for key, value in state.items(): - message, status = await session.create_aggregate( - key=key, content=value, channel="IOT_TEST" - ) - print("sent", message.item_hash) - userdata["received"] = False + try: + # Wait for the next interval + await asyncio.sleep(interval) + + # Only upload if some metrics have been collected + if state: + await _send_metrics(account, state) + else: + print("Waiting for initial metrics...") + + except asyncio.CancelledError: + # Graceful exit upon task cancellation + print("Sender task cancelled.") + break + except Exception as e: + # Catch all exceptions to keep the sender running + print(f"An unexpected error occurred in sender task: {e}") @click.command() -@click.option("--host", default="localhost", help="MQTT Broker host") -@click.option("--port", default=1883, help="MQTT Broker port") +@click.option("--host", default="api1.aleph.im", help="MQTT Broker host") +@click.option("--port", default=8883, type=int, help="MQTT Broker port (Default 8883 for TLS/Websockets)") @click.option("--user", default=None, help="MQTT Auth username") @click.option("--password", default=None, help="MQTT Auth password") -@click.option("--use-tls", is_flag=True) -@click.option("--ca-cert", default=None, help="CA Cert path") -@click.option( - "--pkey", - default=None, - help="Account private key (optionnal, will default to device.key file)", -) -def main(host, port, user, password, use_tls=False, ca_cert=None, pkey=None): - loop = asyncio.get_event_loop() - auth = None - if user is not None: - auth = {"username": user, "password": password} - - if use_tls: - if ca_cert is None: - import certifi - - ca_cert = certifi.where() - print(ca_cert) - - loop.run_until_complete( - gateway(loop, host, port, ca_cert=ca_cert, pkey=pkey, auth=auth) - ) +@click.option("--use-tls", is_flag=True, help="Use TLS/SSL for connection") +@click.option("--ca-cert", default=None, help="Path to CA Cert file") +@click.option("--pkey", default=None, help="Account private key") +@click.option("--interval", default=10, type=int, help="Interval in seconds for metric uploads") +def main(host, port, user, password, use_tls=False, ca_cert=None, pkey=None, interval=10): + """ + Starts the MQTT-to-Aleph.im metric gateway. + """ + # 1. Setup Auth + auth = {"username": user, "password": password} if user is not None else None + + # 2. Handle TLS/Cert setup + if use_tls and ca_cert is None: + import certifi + ca_cert = certifi.where() + print(f"Using default CA bundle from certifi: {ca_cert}") + + # 3. Run the async gateway + try: + asyncio.run( + gateway(host, port, ca_cert=ca_cert, pkey=pkey, auth=auth, send_interval=interval) + ) + except KeyboardInterrupt: + print("\nGateway stopped by user.") + except Exception as e: + print(f"Fatal error in main execution: {e}") if __name__ == "__main__":