From 45188fb7f55c0a5cde0ebe590e9b2db730916877 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Thu, 9 Jan 2025 17:44:32 +0100 Subject: [PATCH 1/4] chore: update gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 3468128..5dbea27 100644 --- a/.gitignore +++ b/.gitignore @@ -67,4 +67,5 @@ bizon/**/config/*.yml profile_default/ ipython_config.py -hubspot_credentials.json \ No newline at end of file +hubspot_credentials.json +test-pipeline.ymlw \ No newline at end of file From d6efe0fb48e9922ee2a5538eafc5bf2422649135 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Thu, 9 Jan 2025 17:44:47 +0100 Subject: [PATCH 2/4] chore: update gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 5dbea27..4402177 100644 --- a/.gitignore +++ b/.gitignore @@ -68,4 +68,4 @@ profile_default/ ipython_config.py hubspot_credentials.json -test-pipeline.ymlw \ No newline at end of file +test-pipeline.yml \ No newline at end of file From 13e7e8c622c92b7f92adac948a74d57932576415 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Thu, 9 Jan 2025 17:46:06 +0100 Subject: [PATCH 3/4] chore: lint --- bizon/alerting/__init__.py | 0 bizon/alerting/alerts.py | 30 ++++++++++++++ bizon/alerting/models.py | 19 +++++++++ bizon/alerting/slack/__init__.py | 0 bizon/alerting/slack/config.py | 5 +++ bizon/alerting/slack/handler.py | 38 ++++++++++++++++++ bizon/common/models.py | 6 +++ bizon/engine/runner/runner.py | 19 ++++++++- tests/alerts/test_slack_alert.py | 69 ++++++++++++++++++++++++++++++++ 9 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 bizon/alerting/__init__.py create mode 100644 bizon/alerting/alerts.py create mode 100644 bizon/alerting/models.py create mode 100644 bizon/alerting/slack/__init__.py create mode 100644 bizon/alerting/slack/config.py create mode 100644 bizon/alerting/slack/handler.py create mode 100644 tests/alerts/test_slack_alert.py diff --git a/bizon/alerting/__init__.py b/bizon/alerting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bizon/alerting/alerts.py b/bizon/alerting/alerts.py new file mode 100644 index 0000000..8d17323 --- /dev/null +++ b/bizon/alerting/alerts.py @@ -0,0 +1,30 @@ +from abc import ABC, abstractmethod +from enum import Enum +from typing import Dict, List + +from loguru import logger + +from bizon.alerting.models import AlertingConfig, AlertMethod + + +class LogLevel(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + + +class AbstractAlert(ABC): + + def __init__(self, type: AlertMethod, config: AlertingConfig): + self.type = type + self.config = config + + @abstractmethod + def handler(self, message: Dict) -> None: + pass + + def add_handlers(self, levels: List[LogLevel] = [LogLevel.ERROR]) -> None: + for level in levels: + logger.add(self.handler, level=level, format="{message}") diff --git a/bizon/alerting/models.py b/bizon/alerting/models.py new file mode 100644 index 0000000..dd9cedd --- /dev/null +++ b/bizon/alerting/models.py @@ -0,0 +1,19 @@ +from enum import Enum +from typing import Union + +from pydantic import BaseModel + +from bizon.alerting.slack.config import SlackConfig + + +class AlertMethod(str, Enum): + """Alerting methods""" + + SLACK = "slack" + + +class AlertingConfig(BaseModel): + """Alerting configuration model""" + + type: AlertMethod + config: Union[SlackConfig] diff --git a/bizon/alerting/slack/__init__.py b/bizon/alerting/slack/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bizon/alerting/slack/config.py b/bizon/alerting/slack/config.py new file mode 100644 index 0000000..0c19aa6 --- /dev/null +++ b/bizon/alerting/slack/config.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class SlackConfig(BaseModel): + webhook_url: str diff --git a/bizon/alerting/slack/handler.py b/bizon/alerting/slack/handler.py new file mode 100644 index 0000000..318c30c --- /dev/null +++ b/bizon/alerting/slack/handler.py @@ -0,0 +1,38 @@ +import os +from typing import Dict + +import requests +from loguru import logger + +from bizon.alerting.alerts import AbstractAlert, AlertMethod +from bizon.alerting.slack.config import SlackConfig + + +class SlackHandler(AbstractAlert): + def __init__(self, config: SlackConfig): + super().__init__(type=AlertMethod.SLACK, config=config) + self.webhook_url = config.webhook_url + + def handler(self, message: Dict) -> None: + """ + Custom handler to send error logs to Slack, with additional context. + """ + log_entry = message.record + error_message = ( + f"*Sync*: `{os.environ.get('BIZON_SYNC_NAME', 'N/A')}`\n" + f"*Source*: `{os.environ.get('BIZON_SOURCE_NAME', 'N/A')}` - `{os.environ.get('BIZON_SOURCE_STREAM', 'N/A')}`\n" # noqa + f"*Destination*: `{os.environ.get('BIZON_DESTINATION_NAME', 'N/A')}`\n\n" + f"*Message:*\n```{log_entry['message']}```\n" + f"*File:* `{log_entry['file'].path}:{log_entry['line']}`\n" + f"*Function:* `{log_entry['function']}`\n" + f"*Level:* `{log_entry['level'].name}`\n" + ) + + payload = {"text": f":rotating_light: *Bizon Pipeline Alert* :rotating_light:\n\n{error_message}"} + + try: + response = requests.post(self.webhook_url, json=payload) + response.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error(f"Failed to send log to Slack: {e}") + return None diff --git a/bizon/common/models.py b/bizon/common/models.py index e4047d0..29650fe 100644 --- a/bizon/common/models.py +++ b/bizon/common/models.py @@ -2,6 +2,7 @@ from pydantic import BaseModel, ConfigDict, Field +from bizon.alerting.models import AlertingConfig from bizon.connectors.destinations.bigquery.src.config import BigQueryConfig from bizon.connectors.destinations.bigquery_streaming.src.config import ( BigQueryStreamingConfig, @@ -47,6 +48,11 @@ class BizonConfig(BaseModel): default=EngineConfig(), ) + alerting: Optional[AlertingConfig] = Field( + description="Alerting configuration", + default=None, + ) + class SyncMetadata(BaseModel): """Model which stores general metadata around a sync. diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index e8bf4f0..39e964b 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -1,5 +1,6 @@ import multiprocessing import multiprocessing.synchronize +import os import sys import threading from abc import ABC, abstractmethod @@ -7,6 +8,8 @@ from loguru import logger +from bizon.alerting.alerts import LogLevel +from bizon.alerting.models import AlertMethod from bizon.cli.utils import parse_from_yaml from bizon.common.models import BizonConfig, SyncMetadata from bizon.destination.destination import AbstractDestination, DestinationFactory @@ -30,11 +33,25 @@ def __init__(self, config: dict): self.config: dict = config self.bizon_config = BizonConfig.model_validate(obj=config) + # Set pipeline information as environment variables + os.environ["BIZON_SYNC_NAME"] = self.bizon_config.name + os.environ["BIZON_SOURCE_NAME"] = self.bizon_config.source.name + os.environ["BIZON_SOURCE_STREAM"] = self.bizon_config.source.stream + os.environ["BIZON_DESTINATION_NAME"] = self.bizon_config.destination.name + # Set log level logger.info(f"Setting log level to {self.bizon_config.engine.runner.log_level.name}") logger.remove() logger.add(sys.stderr, level=self.bizon_config.engine.runner.log_level) + if self.bizon_config.alerting: + logger.info(f"Setting up alerting method {self.bizon_config.alerting.type}") + if self.bizon_config.alerting.type == AlertMethod.SLACK: + from bizon.alerting.slack.handler import SlackHandler + + alert = SlackHandler(webhook_url=self.bizon_config.alerting.config.webhook_url) + alert.add_handlers(levels=[LogLevel.ERROR, LogLevel.WARNING]) + @property def is_running(self) -> bool: """Return True if the pipeline is running""" @@ -119,7 +136,7 @@ def get_or_create_job( if job: # If force_create and a job is already running, we cancel it and create a new one if force_create: - logger.info(f"Found an existing job, cancelling it...") + logger.info("Found an existing job, cancelling it...") backend.update_stream_job_status(job_id=job.id, job_status=JobStatus.CANCELED) logger.info(f"Job {job.id} canceled. Creating a new one...") # Otherwise we return the existing job diff --git a/tests/alerts/test_slack_alert.py b/tests/alerts/test_slack_alert.py new file mode 100644 index 0000000..90b9a2f --- /dev/null +++ b/tests/alerts/test_slack_alert.py @@ -0,0 +1,69 @@ +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from queue import Queue + +import pytest +from loguru import logger + +from bizon.alerting.slack.config import SlackConfig +from bizon.alerting.slack.handler import SlackHandler + + +class DummyWebhookHandler(BaseHTTPRequestHandler): + # Shared queue to store payloads + payload_queue = Queue() + + def do_POST(self): + # Read and store the payload + content_length = int(self.headers["Content-Length"]) + post_data = self.rfile.read(content_length) + self.payload_queue.put(post_data.decode("utf-8")) + + # Send a response back + self.send_response(200) + self.end_headers() + self.wfile.write(b"OK") + + +# Function to start the server in a separate thread +def start_dummy_server(host="localhost", port=8123): + server = HTTPServer((host, port), DummyWebhookHandler) + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + return server, server_thread + + +@pytest.fixture +def dummy_webhook_server(): + # Start the dummy server + server, server_thread = start_dummy_server() + + # Yield control to the test + yield server + + # Shutdown the server after the test + server.shutdown() + server_thread.join() + + # Clear the payload queue + DummyWebhookHandler.payload_queue.queue.clear() + + +def test_slack_log_handler(dummy_webhook_server): + slack_handler = SlackHandler(SlackConfig(webhook_url="http://localhost:8123")) + + slack_handler.add_handlers(levels=["ERROR", "WARNING"]) + + ERROR_MESSAGE = "This is an error message" + WARNING_MESSAGE = "This is a warning message" + + logger.error(ERROR_MESSAGE) + + error_payload = DummyWebhookHandler.payload_queue.get(timeout=1) + assert ERROR_MESSAGE in error_payload + + DummyWebhookHandler.payload_queue.queue.clear() + + logger.warning(WARNING_MESSAGE) + warning_payload = DummyWebhookHandler.payload_queue.get(timeout=1) + assert WARNING_MESSAGE in warning_payload From d30ed162703726dd4dadc0623e7d76682ae065a4 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Thu, 9 Jan 2025 18:35:04 +0100 Subject: [PATCH 4/4] chore: add log levels and e2e testing --- bizon/alerting/alerts.py | 17 +++----- bizon/alerting/models.py | 11 ++++- bizon/alerting/slack/handler.py | 7 ++-- bizon/engine/runner/runner.py | 8 ++-- tests/alerts/test_slack_alert.py | 71 +++++++++++++++++++++++++++++++- 5 files changed, 93 insertions(+), 21 deletions(-) diff --git a/bizon/alerting/alerts.py b/bizon/alerting/alerts.py index 8d17323..e18ce05 100644 --- a/bizon/alerting/alerts.py +++ b/bizon/alerting/alerts.py @@ -1,30 +1,23 @@ from abc import ABC, abstractmethod -from enum import Enum from typing import Dict, List from loguru import logger -from bizon.alerting.models import AlertingConfig, AlertMethod - - -class LogLevel(str, Enum): - DEBUG = "DEBUG" - INFO = "INFO" - WARNING = "WARNING" - ERROR = "ERROR" - CRITICAL = "CRITICAL" +from bizon.alerting.models import AlertingConfig, AlertMethod, LogLevel class AbstractAlert(ABC): - def __init__(self, type: AlertMethod, config: AlertingConfig): + def __init__(self, type: AlertMethod, config: AlertingConfig, log_levels: List[LogLevel] = [LogLevel.ERROR]): self.type = type self.config = config + self.log_levels = log_levels @abstractmethod def handler(self, message: Dict) -> None: pass - def add_handlers(self, levels: List[LogLevel] = [LogLevel.ERROR]) -> None: + def add_handlers(self) -> None: + levels = [level.value for level in self.log_levels] for level in levels: logger.add(self.handler, level=level, format="{message}") diff --git a/bizon/alerting/models.py b/bizon/alerting/models.py index dd9cedd..bdb3d52 100644 --- a/bizon/alerting/models.py +++ b/bizon/alerting/models.py @@ -1,11 +1,19 @@ from enum import Enum -from typing import Union +from typing import List, Optional, Union from pydantic import BaseModel from bizon.alerting.slack.config import SlackConfig +class LogLevel(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + + class AlertMethod(str, Enum): """Alerting methods""" @@ -16,4 +24,5 @@ class AlertingConfig(BaseModel): """Alerting configuration model""" type: AlertMethod + log_levels: Optional[List[LogLevel]] = [LogLevel.ERROR] config: Union[SlackConfig] diff --git a/bizon/alerting/slack/handler.py b/bizon/alerting/slack/handler.py index 318c30c..9c19390 100644 --- a/bizon/alerting/slack/handler.py +++ b/bizon/alerting/slack/handler.py @@ -1,16 +1,17 @@ import os -from typing import Dict +from typing import Dict, List import requests from loguru import logger from bizon.alerting.alerts import AbstractAlert, AlertMethod +from bizon.alerting.models import LogLevel from bizon.alerting.slack.config import SlackConfig class SlackHandler(AbstractAlert): - def __init__(self, config: SlackConfig): - super().__init__(type=AlertMethod.SLACK, config=config) + def __init__(self, config: SlackConfig, log_levels: List[LogLevel] = [LogLevel.ERROR]): + super().__init__(type=AlertMethod.SLACK, config=config, log_levels=log_levels) self.webhook_url = config.webhook_url def handler(self, message: Dict) -> None: diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index 39e964b..6f7a9a1 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -8,7 +8,6 @@ from loguru import logger -from bizon.alerting.alerts import LogLevel from bizon.alerting.models import AlertMethod from bizon.cli.utils import parse_from_yaml from bizon.common.models import BizonConfig, SyncMetadata @@ -49,8 +48,11 @@ def __init__(self, config: dict): if self.bizon_config.alerting.type == AlertMethod.SLACK: from bizon.alerting.slack.handler import SlackHandler - alert = SlackHandler(webhook_url=self.bizon_config.alerting.config.webhook_url) - alert.add_handlers(levels=[LogLevel.ERROR, LogLevel.WARNING]) + alert = SlackHandler( + config=self.bizon_config.alerting.config, + log_levels=self.bizon_config.alerting.log_levels, + ) + alert.add_handlers() @property def is_running(self) -> bool: diff --git a/tests/alerts/test_slack_alert.py b/tests/alerts/test_slack_alert.py index 90b9a2f..4f4cec2 100644 --- a/tests/alerts/test_slack_alert.py +++ b/tests/alerts/test_slack_alert.py @@ -1,12 +1,17 @@ +import json +import os +import tempfile import threading from http.server import BaseHTTPRequestHandler, HTTPServer from queue import Queue import pytest +import yaml from loguru import logger from bizon.alerting.slack.config import SlackConfig from bizon.alerting.slack.handler import SlackHandler +from bizon.engine.engine import RunnerFactory class DummyWebhookHandler(BaseHTTPRequestHandler): @@ -49,8 +54,13 @@ def dummy_webhook_server(): DummyWebhookHandler.payload_queue.queue.clear() -def test_slack_log_handler(dummy_webhook_server): - slack_handler = SlackHandler(SlackConfig(webhook_url="http://localhost:8123")) +@pytest.fixture +def webhook_url(): + return "http://localhost:8123" + + +def test_slack_log_handler(dummy_webhook_server, webhook_url): + slack_handler = SlackHandler(SlackConfig(webhook_url=webhook_url)) slack_handler.add_handlers(levels=["ERROR", "WARNING"]) @@ -67,3 +77,60 @@ def test_slack_log_handler(dummy_webhook_server): logger.warning(WARNING_MESSAGE) warning_payload = DummyWebhookHandler.payload_queue.get(timeout=1) assert WARNING_MESSAGE in warning_payload + + +def test_e2e_logger_to_file(dummy_webhook_server, webhook_url): + + with tempfile.NamedTemporaryFile(delete=False) as temp: + + BIZON_CONFIG_DUMMY_TO_FILE = f""" + name: test_job_3 + + source: + name: dummy + stream: creatures + authentication: + type: api_key + params: + token: dummy_key + + destination: + name: file + config: + filepath: {temp.name} + + transforms: + - label: transform_data + python: | + if 'name' in data: + data['name'] = fake_variable # this is purposely wrong to trigger an error + + engine: + backend: + type: postgres + config: + database: bizon_test + schema: public + syncCursorInDBEvery: 2 + host: {os.environ.get("POSTGRES_HOST", "localhost")} + port: 5432 + username: postgres + password: bizon + + alerting: + type: slack + + config: + webhook_url: {webhook_url} + + log_levels: + - ERROR + """ + + runner = RunnerFactory.create_from_config_dict(yaml.safe_load(BIZON_CONFIG_DUMMY_TO_FILE)) + + runner.run() + + error_payload = DummyWebhookHandler.payload_queue.get(timeout=1) + assert "Error applying transformation" in error_payload + assert "fake_variable" in error_payload