Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tomtoolkit = "^2.10"
psycopg2-binary = "^2.9"
gcn-kafka = "^0.2"
hop-client = "^0.8"
fastavro = "^1.0"

[tool.poetry.dev-dependencies]
coverage = "^6.3.2"
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ Django~=4.1
psycopg2-binary~=2.9
gcn-kafka~=0.2
hop-client~=0.8
fastavro~=1.0
39 changes: 38 additions & 1 deletion tom_alertstreams/alertstreams/hopskotch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
from django.utils import timezone as tz
from django.core.exceptions import ImproperlyConfigured

import fastavro
import fastavro.schema

from hop import Stream
from hop.auth import Auth
from hop.models import JSONBlob
from hop.models import JSONBlob, AvroBlob
from hop.io import Metadata, StartPosition, list_topics

from tom_alertstreams.alertstreams.alertstream import AlertStream
Expand Down Expand Up @@ -91,6 +94,7 @@ def get_stream(self, start_position=StartPosition.LATEST) -> Stream:
hop_auth = Auth(self.username, self.password)

# TODO: allow StartPosition to be set from OPTIONS configuration dictionary
logger.info(f'Instanciating Stream for {hop_auth.username} with start {start_position}')
stream = Stream(auth=hop_auth, start_at=start_position)
return stream

Expand Down Expand Up @@ -161,3 +165,36 @@ def alert_logger(alert: JSONBlob, metadata: Metadata):
# in this case the alert was probably published with hop-client<0.8.0
alert_uuid = None
logger.info(f'Alert (uuid={alert_uuid}) received on topic {metadata.topic}: {alert}; metatdata: {metadata}')


def igwn_alert_logger(alert: JSONBlob, metadata: Metadata):
"""Example alert handler. The method signsture is specific to Hopskotch alerts.
"""
# search the header (list of tuples) for a UUID-tuple (keyed by '_id')
# eg. ('_id', b'$\xd6oGmVM\xed\x97\xe7|\x1c\x8f\x11V\xe9')
alert_uuid_tuple = next((item for item in metadata.headers if item[0] == '_id'), None)
if alert_uuid_tuple:
alert_uuid = uuid.UUID(bytes=alert_uuid_tuple[1])
else:
# in this case the alert was probably published with hop-client<0.8.0
alert_uuid = None

logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) received on topic {metadata.topic}: metatdata: {metadata}')
logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) received a {type(alert)}')

if isinstance(alert, AvroBlob):
logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) alert.schema: {alert.schema}')
logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) alert.format_name: {alert.format_name}')
logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) len(alert.content): {len(alert.content)}')
#alert_content_dict = alert.content[0]
#logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) alert_content_dict: {alert_content_dict.keys()}')
print()
parsed_schema = fastavro.parse_schema(alert.schema)
logger.info(f'igwn_alert_logger:: schema.fullname: {fastavro.schema.fullname(parsed_schema)}')

with open('/tmp/asdfasdf', 'wb') as out:
fastavro.writer(out, fastavro.schema.expand_schema(parsed_schema), alert.content)
print()
else:
logger.info(f'igwn_alert_logger:: Alert (uuid={alert_uuid}) *********************** not AveroBlob ***********************')

10 changes: 7 additions & 3 deletions tom_alertstreams_base/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging.config
import os
from pathlib import Path
import datetime

# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
Expand Down Expand Up @@ -158,15 +159,18 @@
'NAME': 'tom_alertstreams.alertstreams.hopskotch.HopskotchAlertStream',
'OPTIONS': {
# The hop-client requires that the GROUP_ID prefix match the SCIMMA_AUTH_USERNAME
'GROUP_ID': os.getenv('SCIMMA_AUTH_USERNAME', "") + '-' + os.getenv('HOPSKOTCH_GROUP_ID', 'tom-alertstreams-dev'),
'GROUP_ID': os.getenv('SCIMMA_AUTH_USERNAME', "") + '-' +
os.getenv('HOPSKOTCH_GROUP_ID', 'tom-alertstreams-dev') + '-' +
f'{datetime.datetime.now().microsecond}',
'URL': 'kafka://kafka.scimma.org/',
'USERNAME': os.getenv('SCIMMA_AUTH_USERNAME', None),
'PASSWORD': os.getenv('SCIMMA_AUTH_PASSWORD', None),
'TOPIC_HANDLERS': {
'sys.heartbeat': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler',
'sys.heartbeatnew': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler',
#'sys.heartbeat': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler',
#'sys.heartbeatnew': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler',
'tomtoolkit.test': 'tom_alertstreams.alertstreams.hopskotch.alert_logger',
'hermes.test': 'tom_alertstreams.alertstreams.hopskotch.alert_logger',
#'igwn.gwalert': 'tom_alertstreams.alertstreams.hopskotch.igwn_alert_logger',
},
},
},
Expand Down