diff --git a/pyproject.toml b/pyproject.toml index 6478c1e..c8e0403 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ tomtoolkit = "^2.10" psycopg2-binary = "^2.9" gcn-kafka = "^0.2" hop-client = "^0.8" +fastavro = "^1.0" [tool.poetry.dev-dependencies] coverage = "^6.3.2" diff --git a/requirements.txt b/requirements.txt index 9c7321c..a39351d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ Django~=4.1 psycopg2-binary~=2.9 gcn-kafka~=0.2 hop-client~=0.8 +fastavro~=1.0 diff --git a/tom_alertstreams/alertstreams/hopskotch.py b/tom_alertstreams/alertstreams/hopskotch.py index 75c3450..88c3c52 100644 --- a/tom_alertstreams/alertstreams/hopskotch.py +++ b/tom_alertstreams/alertstreams/hopskotch.py @@ -6,9 +6,12 @@ from django.utils import timezone as tz from django.core.exceptions import ImproperlyConfigured +import fastavro +import fastavro.schema + from hop import Stream from hop.auth import Auth -from hop.models import JSONBlob +from hop.models import JSONBlob, AvroBlob from hop.io import Metadata, StartPosition, list_topics from tom_alertstreams.alertstreams.alertstream import AlertStream @@ -91,6 +94,7 @@ def get_stream(self, start_position=StartPosition.LATEST) -> Stream: hop_auth = Auth(self.username, self.password) # TODO: allow StartPosition to be set from OPTIONS configuration dictionary + logger.info(f'Instanciating Stream for {hop_auth.username} with start {start_position}') stream = Stream(auth=hop_auth, start_at=start_position) return stream @@ -161,3 +165,36 @@ def alert_logger(alert: JSONBlob, metadata: Metadata): # in this case the alert was probably published with hop-client<0.8.0 alert_uuid = None logger.info(f'Alert (uuid={alert_uuid}) received on topic {metadata.topic}: {alert}; metatdata: {metadata}') + + +def igwn_alert_logger(alert: JSONBlob, metadata: Metadata): + """Example alert handler. The method signsture is specific to Hopskotch alerts. + """ + # search the header (list of tuples) for a UUID-tuple (keyed by '_id') + # eg. ('_id', b'$\xd6oGmVM\xed\x97\xe7|\x1c\x8f\x11V\xe9') + alert_uuid_tuple = next((item for item in metadata.headers if item[0] == '_id'), None) + if alert_uuid_tuple: + alert_uuid = uuid.UUID(bytes=alert_uuid_tuple[1]) + else: + # in this case the alert was probably published with hop-client<0.8.0 + alert_uuid = None + + logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) received on topic {metadata.topic}: metatdata: {metadata}') + logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) received a {type(alert)}') + + if isinstance(alert, AvroBlob): + logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) alert.schema: {alert.schema}') + logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) alert.format_name: {alert.format_name}') + logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) len(alert.content): {len(alert.content)}') + #alert_content_dict = alert.content[0] + #logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) alert_content_dict: {alert_content_dict.keys()}') + print() + parsed_schema = fastavro.parse_schema(alert.schema) + logger.info(f'igwn_alert_logger:: schema.fullname: {fastavro.schema.fullname(parsed_schema)}') + + with open('/tmp/asdfasdf', 'wb') as out: + fastavro.writer(out, fastavro.schema.expand_schema(parsed_schema), alert.content) + print() + else: + logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) *********************** not AveroBlob ***********************') + diff --git a/tom_alertstreams_base/settings.py b/tom_alertstreams_base/settings.py index 35ca020..e7212e4 100644 --- a/tom_alertstreams_base/settings.py +++ b/tom_alertstreams_base/settings.py @@ -12,6 +12,7 @@ import logging.config import os from pathlib import Path +import datetime # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent @@ -158,15 +159,18 @@ 'NAME': 'tom_alertstreams.alertstreams.hopskotch.HopskotchAlertStream', 'OPTIONS': { # The hop-client requires that the GROUP_ID prefix match the SCIMMA_AUTH_USERNAME - 'GROUP_ID': os.getenv('SCIMMA_AUTH_USERNAME', "") + '-' + os.getenv('HOPSKOTCH_GROUP_ID', 'tom-alertstreams-dev'), + 'GROUP_ID': os.getenv('SCIMMA_AUTH_USERNAME', "") + '-' + + os.getenv('HOPSKOTCH_GROUP_ID', 'tom-alertstreams-dev') + '-' + + f'{datetime.datetime.now().microsecond}', 'URL': 'kafka://kafka.scimma.org/', 'USERNAME': os.getenv('SCIMMA_AUTH_USERNAME', None), 'PASSWORD': os.getenv('SCIMMA_AUTH_PASSWORD', None), 'TOPIC_HANDLERS': { - 'sys.heartbeat': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler', - 'sys.heartbeatnew': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler', + #'sys.heartbeat': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler', + #'sys.heartbeatnew': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler', 'tomtoolkit.test': 'tom_alertstreams.alertstreams.hopskotch.alert_logger', 'hermes.test': 'tom_alertstreams.alertstreams.hopskotch.alert_logger', + #'igwn.gwalert': 'tom_alertstreams.alertstreams.hopskotch.igwn_alert_logger', }, }, },