Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Slack alerting #12

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ bizon/**/config/*.yml
profile_default/
ipython_config.py

hubspot_credentials.json
hubspot_credentials.json
test-pipeline.yml
Empty file added bizon/alerting/__init__.py
Empty file.
23 changes: 23 additions & 0 deletions bizon/alerting/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from typing import Dict, List

from loguru import logger

from bizon.alerting.models import AlertingConfig, AlertMethod, LogLevel


class AbstractAlert(ABC):

def __init__(self, type: AlertMethod, config: AlertingConfig, log_levels: List[LogLevel] = [LogLevel.ERROR]):
self.type = type
self.config = config
self.log_levels = log_levels

@abstractmethod
def handler(self, message: Dict) -> None:
pass

def add_handlers(self) -> None:
levels = [level.value for level in self.log_levels]
for level in levels:
logger.add(self.handler, level=level, format="{message}")
28 changes: 28 additions & 0 deletions bizon/alerting/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from enum import Enum
from typing import List, Optional, Union

from pydantic import BaseModel

from bizon.alerting.slack.config import SlackConfig


class LogLevel(str, Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"


class AlertMethod(str, Enum):
"""Alerting methods"""

SLACK = "slack"


class AlertingConfig(BaseModel):
"""Alerting configuration model"""

type: AlertMethod
log_levels: Optional[List[LogLevel]] = [LogLevel.ERROR]
config: Union[SlackConfig]
Empty file.
5 changes: 5 additions & 0 deletions bizon/alerting/slack/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pydantic import BaseModel


class SlackConfig(BaseModel):
webhook_url: str
39 changes: 39 additions & 0 deletions bizon/alerting/slack/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from typing import Dict, List

import requests
from loguru import logger

from bizon.alerting.alerts import AbstractAlert, AlertMethod
from bizon.alerting.models import LogLevel
from bizon.alerting.slack.config import SlackConfig


class SlackHandler(AbstractAlert):
def __init__(self, config: SlackConfig, log_levels: List[LogLevel] = [LogLevel.ERROR]):
super().__init__(type=AlertMethod.SLACK, config=config, log_levels=log_levels)
self.webhook_url = config.webhook_url

def handler(self, message: Dict) -> None:
"""
Custom handler to send error logs to Slack, with additional context.
"""
log_entry = message.record
error_message = (
f"*Sync*: `{os.environ.get('BIZON_SYNC_NAME', 'N/A')}`\n"
f"*Source*: `{os.environ.get('BIZON_SOURCE_NAME', 'N/A')}` - `{os.environ.get('BIZON_SOURCE_STREAM', 'N/A')}`\n" # noqa
f"*Destination*: `{os.environ.get('BIZON_DESTINATION_NAME', 'N/A')}`\n\n"
f"*Message:*\n```{log_entry['message']}```\n"
f"*File:* `{log_entry['file'].path}:{log_entry['line']}`\n"
f"*Function:* `{log_entry['function']}`\n"
f"*Level:* `{log_entry['level'].name}`\n"
)

payload = {"text": f":rotating_light: *Bizon Pipeline Alert* :rotating_light:\n\n{error_message}"}

try:
response = requests.post(self.webhook_url, json=payload)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send log to Slack: {e}")
return None
6 changes: 6 additions & 0 deletions bizon/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pydantic import BaseModel, ConfigDict, Field

from bizon.alerting.models import AlertingConfig
from bizon.connectors.destinations.bigquery.src.config import BigQueryConfig
from bizon.connectors.destinations.bigquery_streaming.src.config import (
BigQueryStreamingConfig,
Expand Down Expand Up @@ -47,6 +48,11 @@ class BizonConfig(BaseModel):
default=EngineConfig(),
)

alerting: Optional[AlertingConfig] = Field(
description="Alerting configuration",
default=None,
)


class SyncMetadata(BaseModel):
"""Model which stores general metadata around a sync.
Expand Down
21 changes: 20 additions & 1 deletion bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import multiprocessing
import multiprocessing.synchronize
import os
import sys
import threading
from abc import ABC, abstractmethod
from typing import Union

from loguru import logger

from bizon.alerting.models import AlertMethod
from bizon.cli.utils import parse_from_yaml
from bizon.common.models import BizonConfig, SyncMetadata
from bizon.destination.destination import AbstractDestination, DestinationFactory
Expand All @@ -30,11 +32,28 @@ def __init__(self, config: dict):
self.config: dict = config
self.bizon_config = BizonConfig.model_validate(obj=config)

# Set pipeline information as environment variables
os.environ["BIZON_SYNC_NAME"] = self.bizon_config.name
os.environ["BIZON_SOURCE_NAME"] = self.bizon_config.source.name
os.environ["BIZON_SOURCE_STREAM"] = self.bizon_config.source.stream
os.environ["BIZON_DESTINATION_NAME"] = self.bizon_config.destination.name

# Set log level
logger.info(f"Setting log level to {self.bizon_config.engine.runner.log_level.name}")
logger.remove()
logger.add(sys.stderr, level=self.bizon_config.engine.runner.log_level)

if self.bizon_config.alerting:
logger.info(f"Setting up alerting method {self.bizon_config.alerting.type}")
if self.bizon_config.alerting.type == AlertMethod.SLACK:
from bizon.alerting.slack.handler import SlackHandler

alert = SlackHandler(
config=self.bizon_config.alerting.config,
log_levels=self.bizon_config.alerting.log_levels,
)
alert.add_handlers()

@property
def is_running(self) -> bool:
"""Return True if the pipeline is running"""
Expand Down Expand Up @@ -119,7 +138,7 @@ def get_or_create_job(
if job:
# If force_create and a job is already running, we cancel it and create a new one
if force_create:
logger.info(f"Found an existing job, cancelling it...")
logger.info("Found an existing job, cancelling it...")
backend.update_stream_job_status(job_id=job.id, job_status=JobStatus.CANCELED)
logger.info(f"Job {job.id} canceled. Creating a new one...")
# Otherwise we return the existing job
Expand Down
136 changes: 136 additions & 0 deletions tests/alerts/test_slack_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import json
import os
import tempfile
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from queue import Queue

import pytest
import yaml
from loguru import logger

from bizon.alerting.slack.config import SlackConfig
from bizon.alerting.slack.handler import SlackHandler
from bizon.engine.engine import RunnerFactory


class DummyWebhookHandler(BaseHTTPRequestHandler):
# Shared queue to store payloads
payload_queue = Queue()

def do_POST(self):
# Read and store the payload
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length)
self.payload_queue.put(post_data.decode("utf-8"))

# Send a response back
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")


# Function to start the server in a separate thread
def start_dummy_server(host="localhost", port=8123):
server = HTTPServer((host, port), DummyWebhookHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
return server, server_thread


@pytest.fixture
def dummy_webhook_server():
# Start the dummy server
server, server_thread = start_dummy_server()

# Yield control to the test
yield server

# Shutdown the server after the test
server.shutdown()
server_thread.join()

# Clear the payload queue
DummyWebhookHandler.payload_queue.queue.clear()


@pytest.fixture
def webhook_url():
return "http://localhost:8123"


def test_slack_log_handler(dummy_webhook_server, webhook_url):
slack_handler = SlackHandler(SlackConfig(webhook_url=webhook_url))

slack_handler.add_handlers(levels=["ERROR", "WARNING"])

ERROR_MESSAGE = "This is an error message"
WARNING_MESSAGE = "This is a warning message"

logger.error(ERROR_MESSAGE)

error_payload = DummyWebhookHandler.payload_queue.get(timeout=1)
assert ERROR_MESSAGE in error_payload

DummyWebhookHandler.payload_queue.queue.clear()

logger.warning(WARNING_MESSAGE)
warning_payload = DummyWebhookHandler.payload_queue.get(timeout=1)
assert WARNING_MESSAGE in warning_payload


def test_e2e_logger_to_file(dummy_webhook_server, webhook_url):

with tempfile.NamedTemporaryFile(delete=False) as temp:

BIZON_CONFIG_DUMMY_TO_FILE = f"""
name: test_job_3

source:
name: dummy
stream: creatures
authentication:
type: api_key
params:
token: dummy_key

destination:
name: file
config:
filepath: {temp.name}

transforms:
- label: transform_data
python: |
if 'name' in data:
data['name'] = fake_variable # this is purposely wrong to trigger an error

engine:
backend:
type: postgres
config:
database: bizon_test
schema: public
syncCursorInDBEvery: 2
host: {os.environ.get("POSTGRES_HOST", "localhost")}
port: 5432
username: postgres
password: bizon

alerting:
type: slack

config:
webhook_url: {webhook_url}

log_levels:
- ERROR
"""

runner = RunnerFactory.create_from_config_dict(yaml.safe_load(BIZON_CONFIG_DUMMY_TO_FILE))

runner.run()

error_payload = DummyWebhookHandler.payload_queue.get(timeout=1)
assert "Error applying transformation" in error_payload
assert "fake_variable" in error_payload
Loading