diff --git a/.gitignore b/.gitignore index 3468128..4402177 100644 --- a/.gitignore +++ b/.gitignore @@ -67,4 +67,5 @@ bizon/**/config/*.yml profile_default/ ipython_config.py -hubspot_credentials.json \ No newline at end of file +hubspot_credentials.json +test-pipeline.yml \ No newline at end of file diff --git a/bizon/alerting/__init__.py b/bizon/alerting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bizon/alerting/alerts.py b/bizon/alerting/alerts.py new file mode 100644 index 0000000..e18ce05 --- /dev/null +++ b/bizon/alerting/alerts.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod +from typing import Dict, List + +from loguru import logger + +from bizon.alerting.models import AlertingConfig, AlertMethod, LogLevel + + +class AbstractAlert(ABC): + + def __init__(self, type: AlertMethod, config: AlertingConfig, log_levels: List[LogLevel] = [LogLevel.ERROR]): + self.type = type + self.config = config + self.log_levels = log_levels + + @abstractmethod + def handler(self, message: Dict) -> None: + pass + + def add_handlers(self) -> None: + levels = [level.value for level in self.log_levels] + for level in levels: + logger.add(self.handler, level=level, format="{message}") diff --git a/bizon/alerting/models.py b/bizon/alerting/models.py new file mode 100644 index 0000000..bdb3d52 --- /dev/null +++ b/bizon/alerting/models.py @@ -0,0 +1,28 @@ +from enum import Enum +from typing import List, Optional, Union + +from pydantic import BaseModel + +from bizon.alerting.slack.config import SlackConfig + + +class LogLevel(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + + +class AlertMethod(str, Enum): + """Alerting methods""" + + SLACK = "slack" + + +class AlertingConfig(BaseModel): + """Alerting configuration model""" + + type: AlertMethod + log_levels: Optional[List[LogLevel]] = [LogLevel.ERROR] + config: Union[SlackConfig] diff --git a/bizon/alerting/slack/__init__.py b/bizon/alerting/slack/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bizon/alerting/slack/config.py b/bizon/alerting/slack/config.py new file mode 100644 index 0000000..0c19aa6 --- /dev/null +++ b/bizon/alerting/slack/config.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class SlackConfig(BaseModel): + webhook_url: str diff --git a/bizon/alerting/slack/handler.py b/bizon/alerting/slack/handler.py new file mode 100644 index 0000000..9c19390 --- /dev/null +++ b/bizon/alerting/slack/handler.py @@ -0,0 +1,39 @@ +import os +from typing import Dict, List + +import requests +from loguru import logger + +from bizon.alerting.alerts import AbstractAlert, AlertMethod +from bizon.alerting.models import LogLevel +from bizon.alerting.slack.config import SlackConfig + + +class SlackHandler(AbstractAlert): + def __init__(self, config: SlackConfig, log_levels: List[LogLevel] = [LogLevel.ERROR]): + super().__init__(type=AlertMethod.SLACK, config=config, log_levels=log_levels) + self.webhook_url = config.webhook_url + + def handler(self, message: Dict) -> None: + """ + Custom handler to send error logs to Slack, with additional context. + """ + log_entry = message.record + error_message = ( + f"*Sync*: `{os.environ.get('BIZON_SYNC_NAME', 'N/A')}`\n" + f"*Source*: `{os.environ.get('BIZON_SOURCE_NAME', 'N/A')}` - `{os.environ.get('BIZON_SOURCE_STREAM', 'N/A')}`\n" # noqa + f"*Destination*: `{os.environ.get('BIZON_DESTINATION_NAME', 'N/A')}`\n\n" + f"*Message:*\n```{log_entry['message']}```\n" + f"*File:* `{log_entry['file'].path}:{log_entry['line']}`\n" + f"*Function:* `{log_entry['function']}`\n" + f"*Level:* `{log_entry['level'].name}`\n" + ) + + payload = {"text": f":rotating_light: *Bizon Pipeline Alert* :rotating_light:\n\n{error_message}"} + + try: + response = requests.post(self.webhook_url, json=payload) + response.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error(f"Failed to send log to Slack: {e}") + return None diff --git a/bizon/common/models.py b/bizon/common/models.py index e4047d0..29650fe 100644 --- a/bizon/common/models.py +++ b/bizon/common/models.py @@ -2,6 +2,7 @@ from pydantic import BaseModel, ConfigDict, Field +from bizon.alerting.models import AlertingConfig from bizon.connectors.destinations.bigquery.src.config import BigQueryConfig from bizon.connectors.destinations.bigquery_streaming.src.config import ( BigQueryStreamingConfig, @@ -47,6 +48,11 @@ class BizonConfig(BaseModel): default=EngineConfig(), ) + alerting: Optional[AlertingConfig] = Field( + description="Alerting configuration", + default=None, + ) + class SyncMetadata(BaseModel): """Model which stores general metadata around a sync. diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index e8bf4f0..6f7a9a1 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -1,5 +1,6 @@ import multiprocessing import multiprocessing.synchronize +import os import sys import threading from abc import ABC, abstractmethod @@ -7,6 +8,7 @@ from loguru import logger +from bizon.alerting.models import AlertMethod from bizon.cli.utils import parse_from_yaml from bizon.common.models import BizonConfig, SyncMetadata from bizon.destination.destination import AbstractDestination, DestinationFactory @@ -30,11 +32,28 @@ def __init__(self, config: dict): self.config: dict = config self.bizon_config = BizonConfig.model_validate(obj=config) + # Set pipeline information as environment variables + os.environ["BIZON_SYNC_NAME"] = self.bizon_config.name + os.environ["BIZON_SOURCE_NAME"] = self.bizon_config.source.name + os.environ["BIZON_SOURCE_STREAM"] = self.bizon_config.source.stream + os.environ["BIZON_DESTINATION_NAME"] = self.bizon_config.destination.name + # Set log level logger.info(f"Setting log level to {self.bizon_config.engine.runner.log_level.name}") logger.remove() logger.add(sys.stderr, level=self.bizon_config.engine.runner.log_level) + if self.bizon_config.alerting: + logger.info(f"Setting up alerting method {self.bizon_config.alerting.type}") + if self.bizon_config.alerting.type == AlertMethod.SLACK: + from bizon.alerting.slack.handler import SlackHandler + + alert = SlackHandler( + config=self.bizon_config.alerting.config, + log_levels=self.bizon_config.alerting.log_levels, + ) + alert.add_handlers() + @property def is_running(self) -> bool: """Return True if the pipeline is running""" @@ -119,7 +138,7 @@ def get_or_create_job( if job: # If force_create and a job is already running, we cancel it and create a new one if force_create: - logger.info(f"Found an existing job, cancelling it...") + logger.info("Found an existing job, cancelling it...") backend.update_stream_job_status(job_id=job.id, job_status=JobStatus.CANCELED) logger.info(f"Job {job.id} canceled. Creating a new one...") # Otherwise we return the existing job diff --git a/tests/alerts/test_slack_alert.py b/tests/alerts/test_slack_alert.py new file mode 100644 index 0000000..4f4cec2 --- /dev/null +++ b/tests/alerts/test_slack_alert.py @@ -0,0 +1,136 @@ +import json +import os +import tempfile +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from queue import Queue + +import pytest +import yaml +from loguru import logger + +from bizon.alerting.slack.config import SlackConfig +from bizon.alerting.slack.handler import SlackHandler +from bizon.engine.engine import RunnerFactory + + +class DummyWebhookHandler(BaseHTTPRequestHandler): + # Shared queue to store payloads + payload_queue = Queue() + + def do_POST(self): + # Read and store the payload + content_length = int(self.headers["Content-Length"]) + post_data = self.rfile.read(content_length) + self.payload_queue.put(post_data.decode("utf-8")) + + # Send a response back + self.send_response(200) + self.end_headers() + self.wfile.write(b"OK") + + +# Function to start the server in a separate thread +def start_dummy_server(host="localhost", port=8123): + server = HTTPServer((host, port), DummyWebhookHandler) + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + return server, server_thread + + +@pytest.fixture +def dummy_webhook_server(): + # Start the dummy server + server, server_thread = start_dummy_server() + + # Yield control to the test + yield server + + # Shutdown the server after the test + server.shutdown() + server_thread.join() + + # Clear the payload queue + DummyWebhookHandler.payload_queue.queue.clear() + + +@pytest.fixture +def webhook_url(): + return "http://localhost:8123" + + +def test_slack_log_handler(dummy_webhook_server, webhook_url): + slack_handler = SlackHandler(SlackConfig(webhook_url=webhook_url)) + + slack_handler.add_handlers(levels=["ERROR", "WARNING"]) + + ERROR_MESSAGE = "This is an error message" + WARNING_MESSAGE = "This is a warning message" + + logger.error(ERROR_MESSAGE) + + error_payload = DummyWebhookHandler.payload_queue.get(timeout=1) + assert ERROR_MESSAGE in error_payload + + DummyWebhookHandler.payload_queue.queue.clear() + + logger.warning(WARNING_MESSAGE) + warning_payload = DummyWebhookHandler.payload_queue.get(timeout=1) + assert WARNING_MESSAGE in warning_payload + + +def test_e2e_logger_to_file(dummy_webhook_server, webhook_url): + + with tempfile.NamedTemporaryFile(delete=False) as temp: + + BIZON_CONFIG_DUMMY_TO_FILE = f""" + name: test_job_3 + + source: + name: dummy + stream: creatures + authentication: + type: api_key + params: + token: dummy_key + + destination: + name: file + config: + filepath: {temp.name} + + transforms: + - label: transform_data + python: | + if 'name' in data: + data['name'] = fake_variable # this is purposely wrong to trigger an error + + engine: + backend: + type: postgres + config: + database: bizon_test + schema: public + syncCursorInDBEvery: 2 + host: {os.environ.get("POSTGRES_HOST", "localhost")} + port: 5432 + username: postgres + password: bizon + + alerting: + type: slack + + config: + webhook_url: {webhook_url} + + log_levels: + - ERROR + """ + + runner = RunnerFactory.create_from_config_dict(yaml.safe_load(BIZON_CONFIG_DUMMY_TO_FILE)) + + runner.run() + + error_payload = DummyWebhookHandler.payload_queue.get(timeout=1) + assert "Error applying transformation" in error_payload + assert "fake_variable" in error_payload