Skip to content

Commit

Permalink
chore: lint
Browse files Browse the repository at this point in the history
  • Loading branch information
anaselmhamdi committed Jan 23, 2025
1 parent 8a6198e commit 0001f85
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 27 deletions.
13 changes: 6 additions & 7 deletions bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
AbstractQueueConfig,
QueueMessage,
)
from bizon.monitoring.client import Monitor
from bizon.monitoring.monitor import AbstractMonitor
from bizon.transform.transform import Transform


Expand All @@ -24,7 +24,7 @@ def __init__(
config: AbstractQueueConfig,
destination: AbstractDestination,
transform: Transform,
monitor: Monitor,
monitor: AbstractMonitor,
):
self.config = config
self.destination = destination
Expand All @@ -43,7 +43,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt
except Exception as e:
logger.error(f"Error applying transformation: {e}")
logger.error(traceback.format_exc())
self.monitor.report_pipeline_status(PipelineReturnStatus.TRANSFORM_ERROR)
self.monitor.track_pipeline_status(PipelineReturnStatus.TRANSFORM_ERROR)
return PipelineReturnStatus.TRANSFORM_ERROR

# Handle last iteration
Expand All @@ -57,16 +57,15 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt
pagination=queue_message.pagination,
last_iteration=True,
)
self.monitor.report_pipeline_status(PipelineReturnStatus.SUCCESS)
self.monitor.track_pipeline_status(PipelineReturnStatus.SUCCESS)
return PipelineReturnStatus.SUCCESS

except Exception as e:
logger.error(f"Error writing records to destination: {e}")
self.monitor.report_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
self.monitor.track_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
return PipelineReturnStatus.DESTINATION_ERROR

# Write the records to the destination
self.monitor.report_pipeline_status(PipelineReturnStatus.RUNNING)
try:
self.destination.write_records_and_update_cursor(
df_source_records=df_source_records,
Expand All @@ -78,7 +77,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt

except Exception as e:
logger.error(f"Error writing records to destination: {e}")
self.monitor.report_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
self.monitor.track_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
return PipelineReturnStatus.DESTINATION_ERROR

raise RuntimeError("Should not reach this point")
8 changes: 4 additions & 4 deletions bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.engine.queue.config import QueueMessage
from bizon.engine.queue.queue import AbstractQueue
from bizon.monitoring.client import Monitor
from bizon.monitoring.monitor import AbstractMonitor
from bizon.transform.transform import Transform

from .config import PythonQueueConfig
Expand All @@ -23,7 +23,7 @@ def __init__(
queue: AbstractQueue,
destination: AbstractDestination,
transform: Transform,
monitor: Monitor,
monitor: AbstractMonitor,
):
super().__init__(
config,
Expand All @@ -32,15 +32,15 @@ def __init__(
monitor=monitor,
)
self.queue = queue
self.monitor.track_pipeline_status(PipelineReturnStatus.RUNNING)

def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus:

while True:

# Handle kill signal from the runner
if stop_event.is_set():
logger.info("Stop event is set, closing consumer ...")
self.monitor.report_pipeline_status(PipelineReturnStatus.KILLED_BY_RUNNER)
self.monitor.track_pipeline_status(PipelineReturnStatus.KILLED_BY_RUNNER)
return PipelineReturnStatus.KILLED_BY_RUNNER

# Retrieve the message from the queue
Expand Down
4 changes: 2 additions & 2 deletions bizon/engine/queue/adapters/python_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from bizon.destination.destination import AbstractDestination
from bizon.engine.queue.config import QUEUE_TERMINATION, QueueMessage
from bizon.engine.queue.queue import AbstractQueue, AbstractQueueConsumer
from bizon.monitoring.client import Monitor
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.models import SourceIteration
from bizon.transform.transform import Transform

Expand All @@ -30,7 +30,7 @@ def connect(self):
pass

def get_consumer(
self, destination: AbstractDestination, transform: Transform, monitor: Monitor
self, destination: AbstractDestination, transform: Transform, monitor: AbstractMonitor
) -> AbstractQueueConsumer:
return PythonQueueConsumer(
config=self.config, queue=self.queue, destination=destination, transform=transform, monitor=monitor
Expand Down
7 changes: 5 additions & 2 deletions bizon/engine/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from bizon.destination.destination import AbstractDestination
from bizon.engine.pipeline.consumer import AbstractQueueConsumer
from bizon.monitoring.client import Monitor
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.models import SourceIteration, source_record_schema
from bizon.transform.transform import Transform

Expand All @@ -31,7 +31,10 @@ def connect(self):

@abstractmethod
def get_consumer(
self, destination: AbstractDestination, transform: Transform, monitor: Monitor
self,
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
) -> AbstractQueueConsumer:
pass

Expand Down
6 changes: 3 additions & 3 deletions bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from bizon.engine.pipeline.producer import Producer
from bizon.engine.queue.queue import AbstractQueue, QueueFactory
from bizon.engine.runner.config import RunnerStatus
from bizon.monitoring.client import Monitor
from bizon.monitoring.monitor import AbstractMonitor, MonitorFactory
from bizon.source.discover import get_source_instance_by_source_and_stream
from bizon.source.source import AbstractSource
from bizon.transform.transform import Transform
Expand Down Expand Up @@ -120,9 +120,9 @@ def get_transform(bizon_config: BizonConfig) -> Transform:
return Transform(transforms=bizon_config.transforms)

@staticmethod
def get_monitoring_client(bizon_config: BizonConfig) -> Monitor:
def get_monitoring_client(bizon_config: BizonConfig) -> AbstractMonitor:
"""Return the monitoring client instance"""
return Monitor(bizon_config)
return MonitorFactory.get_monitor(bizon_config)

@staticmethod
def get_or_create_job(
Expand Down
14 changes: 13 additions & 1 deletion bizon/monitoring/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel


class MonitoringConfig(BaseModel):
class MonitorType(str, Enum):
DATADOG = "datadog"


class DatadogConfig(BaseModel):
datadog_agent_host: str
datadog_agent_port: int = 8125


class MonitoringConfig(BaseModel):
type: MonitorType
config: Optional[DatadogConfig] = None
Empty file.
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import logging

from datadog import initialize, statsd
from loguru import logger

from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.monitoring.monitor import AbstractMonitor


class Monitor:
class DatadogMonitor(AbstractMonitor):
def __init__(self, pipeline_config: BizonConfig):
self.pipeline_config = pipeline_config
super().__init__(pipeline_config)

# In Kubernetes, set the host dynamically
try:
initialize(
statsd_host=pipeline_config.monitoring.datadog_agent_host,
statsd_port=pipeline_config.monitoring.datadog_agent_port,
statsd_host=pipeline_config.monitoring.config.datadog_agent_host,
statsd_port=pipeline_config.monitoring.config.datadog_agent_port,
)
except Exception as e:
logging.info(f"Failed to initialize Datadog agent: {e}")
logger.info(f"Failed to initialize Datadog agent: {e}")

self.pipeline_monitor_status = "bizon_pipeline.status"
self.tags = [
Expand All @@ -29,7 +29,7 @@ def __init__(self, pipeline_config: BizonConfig):

self.pipeline_active_pipelines = "bizon_pipeline.active_pipelines"

def report_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
def track_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
"""
Track the status of the pipeline.
Expand Down
35 changes: 35 additions & 0 deletions bizon/monitoring/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from abc import ABC, abstractmethod

from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.monitoring.config import MonitorType


class AbstractMonitor(ABC):
def __init__(self, pipeline_config: BizonConfig):
self.pipeline_config = pipeline_config
# Initialize the monitor

@abstractmethod
def track_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
"""
Track the status of the pipeline.
Args:
status (str): The current status of the pipeline (e.g., 'running', 'failed', 'completed').
"""
pass


class MonitorFactory:
@staticmethod
def get_monitor(pipeline_config: BizonConfig) -> AbstractMonitor:
if pipeline_config.monitoring is None:
from bizon.monitoring.noop.monitor import NoOpMonitor

return NoOpMonitor(pipeline_config)

if pipeline_config.monitoring.type == MonitorType.DATADOG:
from bizon.monitoring.datadog.monitor import DatadogMonitor

return DatadogMonitor(pipeline_config)
Empty file.
11 changes: 11 additions & 0 deletions bizon/monitoring/noop/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.monitoring.monitor import AbstractMonitor


class NoOpMonitor(AbstractMonitor):
def __init__(self, pipeline_config: BizonConfig):
super().__init__(pipeline_config)

def track_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
pass

0 comments on commit 0001f85

Please sign in to comment.