diff --git a/.gitignore b/.gitignore index 4402177..1431a07 100644 --- a/.gitignore +++ b/.gitignore @@ -68,4 +68,4 @@ profile_default/ ipython_config.py hubspot_credentials.json -test-pipeline.yml \ No newline at end of file +test-pipeline*.yml \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index b57b1b4..3b43dcf 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,7 @@ "name": "Kafka pipeline", "type": "debugpy", "request": "launch", - "program": "${workspaceFolder}/bizon/sources/kafka/tests/kafka_pipeline.py", + "program": "${workspaceFolder}/bizon/connectors/sources/kafka/tests/kafka_pipeline.py", "console": "integratedTerminal" } ] diff --git a/bizon/common/models.py b/bizon/common/models.py index 29650fe..00821ad 100644 --- a/bizon/common/models.py +++ b/bizon/common/models.py @@ -10,6 +10,7 @@ from bizon.connectors.destinations.file.src.config import FileDestinationConfig from bizon.connectors.destinations.logger.src.config import LoggerConfig from bizon.engine.config import EngineConfig +from bizon.monitoring.config import MonitoringConfig from bizon.source.config import SourceConfig, SourceSyncModes from bizon.transform.config import TransformModel @@ -53,6 +54,11 @@ class BizonConfig(BaseModel): default=None, ) + monitoring: Optional[MonitoringConfig] = Field( + description="Monitoring configuration", + default=None, + ) + class SyncMetadata(BaseModel): """Model which stores general metadata around a sync. diff --git a/bizon/connectors/sources/kafka/tests/kafka_pipeline.py b/bizon/connectors/sources/kafka/tests/kafka_pipeline.py index d112577..c808616 100644 --- a/bizon/connectors/sources/kafka/tests/kafka_pipeline.py +++ b/bizon/connectors/sources/kafka/tests/kafka_pipeline.py @@ -3,7 +3,5 @@ from bizon.engine.engine import RunnerFactory if __name__ == "__main__": - runner = RunnerFactory.create_from_yaml( - filepath=os.path.abspath("bizon/connectors/sources/kafka/config/kafka.example.yml") - ) + runner = RunnerFactory.create_from_yaml(filepath=os.path.abspath("test-pipeline-monitoring.yml")) runner.run() diff --git a/bizon/engine/pipeline/consumer.py b/bizon/engine/pipeline/consumer.py index 94817a5..a7a5f15 100644 --- a/bizon/engine/pipeline/consumer.py +++ b/bizon/engine/pipeline/consumer.py @@ -14,14 +14,22 @@ AbstractQueueConfig, QueueMessage, ) +from bizon.monitoring.client import Monitor from bizon.transform.transform import Transform class AbstractQueueConsumer(ABC): - def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination, transform: Transform): + def __init__( + self, + config: AbstractQueueConfig, + destination: AbstractDestination, + transform: Transform, + monitor: Monitor, + ): self.config = config self.destination = destination self.transform = transform + self.monitor = monitor @abstractmethod def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus: @@ -35,6 +43,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt except Exception as e: logger.error(f"Error applying transformation: {e}") logger.error(traceback.format_exc()) + self.monitor.report_pipeline_status(PipelineReturnStatus.TRANSFORM_ERROR) return PipelineReturnStatus.TRANSFORM_ERROR # Handle last iteration @@ -48,13 +57,16 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt pagination=queue_message.pagination, last_iteration=True, ) + self.monitor.report_pipeline_status(PipelineReturnStatus.SUCCESS) return PipelineReturnStatus.SUCCESS except Exception as e: logger.error(f"Error writing records to destination: {e}") + self.monitor.report_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR) return PipelineReturnStatus.DESTINATION_ERROR # Write the records to the destination + self.monitor.report_pipeline_status(PipelineReturnStatus.RUNNING) try: self.destination.write_records_and_update_cursor( df_source_records=df_source_records, @@ -66,6 +78,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt except Exception as e: logger.error(f"Error writing records to destination: {e}") + self.monitor.report_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR) return PipelineReturnStatus.DESTINATION_ERROR raise RuntimeError("Should not reach this point") diff --git a/bizon/engine/queue/adapters/python_queue/consumer.py b/bizon/engine/queue/adapters/python_queue/consumer.py index d550708..114eb47 100644 --- a/bizon/engine/queue/adapters/python_queue/consumer.py +++ b/bizon/engine/queue/adapters/python_queue/consumer.py @@ -10,6 +10,7 @@ from bizon.engine.pipeline.models import PipelineReturnStatus from bizon.engine.queue.config import QueueMessage from bizon.engine.queue.queue import AbstractQueue +from bizon.monitoring.client import Monitor from bizon.transform.transform import Transform from .config import PythonQueueConfig @@ -17,9 +18,19 @@ class PythonQueueConsumer(AbstractQueueConsumer): def __init__( - self, config: PythonQueueConfig, queue: AbstractQueue, destination: AbstractDestination, transform: Transform + self, + config: PythonQueueConfig, + queue: AbstractQueue, + destination: AbstractDestination, + transform: Transform, + monitor: Monitor, ): - super().__init__(config, destination=destination, transform=transform) + super().__init__( + config, + destination=destination, + transform=transform, + monitor=monitor, + ) self.queue = queue def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus: @@ -29,6 +40,7 @@ def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Eve # Handle kill signal from the runner if stop_event.is_set(): logger.info("Stop event is set, closing consumer ...") + self.monitor.report_pipeline_status(PipelineReturnStatus.KILLED_BY_RUNNER) return PipelineReturnStatus.KILLED_BY_RUNNER # Retrieve the message from the queue diff --git a/bizon/engine/queue/adapters/python_queue/queue.py b/bizon/engine/queue/adapters/python_queue/queue.py index 81e131a..6356c05 100644 --- a/bizon/engine/queue/adapters/python_queue/queue.py +++ b/bizon/engine/queue/adapters/python_queue/queue.py @@ -8,6 +8,7 @@ from bizon.destination.destination import AbstractDestination from bizon.engine.queue.config import QUEUE_TERMINATION, QueueMessage from bizon.engine.queue.queue import AbstractQueue, AbstractQueueConsumer +from bizon.monitoring.client import Monitor from bizon.source.models import SourceIteration from bizon.transform.transform import Transform @@ -28,8 +29,12 @@ def connect(self): # No connection to establish for PythonQueue pass - def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer: - return PythonQueueConsumer(config=self.config, queue=self.queue, destination=destination, transform=transform) + def get_consumer( + self, destination: AbstractDestination, transform: Transform, monitor: Monitor + ) -> AbstractQueueConsumer: + return PythonQueueConsumer( + config=self.config, queue=self.queue, destination=destination, transform=transform, monitor=monitor + ) def put_queue_message(self, queue_message: QueueMessage): if not self.queue.full(): diff --git a/bizon/engine/queue/queue.py b/bizon/engine/queue/queue.py index c1f7e59..f0ac419 100644 --- a/bizon/engine/queue/queue.py +++ b/bizon/engine/queue/queue.py @@ -1,14 +1,14 @@ import json from abc import ABC, abstractmethod -from dataclasses import dataclass from datetime import datetime -from typing import Optional, Union +from typing import Union import polars as pl from pytz import UTC from bizon.destination.destination import AbstractDestination from bizon.engine.pipeline.consumer import AbstractQueueConsumer +from bizon.monitoring.client import Monitor from bizon.source.models import SourceIteration, source_record_schema from bizon.transform.transform import Transform @@ -30,7 +30,9 @@ def connect(self): pass @abstractmethod - def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer: + def get_consumer( + self, destination: AbstractDestination, transform: Transform, monitor: Monitor + ) -> AbstractQueueConsumer: pass @abstractmethod diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index 6f7a9a1..ed45767 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -17,6 +17,7 @@ from bizon.engine.pipeline.producer import Producer from bizon.engine.queue.queue import AbstractQueue, QueueFactory from bizon.engine.runner.config import RunnerStatus +from bizon.monitoring.client import Monitor from bizon.source.discover import get_source_instance_by_source_and_stream from bizon.source.source import AbstractSource from bizon.transform.transform import Transform @@ -118,6 +119,11 @@ def get_transform(bizon_config: BizonConfig) -> Transform: """Return the transform instance to apply to the source records""" return Transform(transforms=bizon_config.transforms) + @staticmethod + def get_monitoring_client(bizon_config: BizonConfig) -> Monitor: + """Return the monitoring client instance""" + return Monitor(bizon_config) + @staticmethod def get_or_create_job( bizon_config: BizonConfig, @@ -231,8 +237,13 @@ def instanciate_and_run_consumer( backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs) destination = AbstractRunner.get_destination(bizon_config=bizon_config, backend=backend, job_id=job_id) transform = AbstractRunner.get_transform(bizon_config=bizon_config) + monitor = AbstractRunner.get_monitoring_client(bizon_config=bizon_config) - consumer = queue.get_consumer(destination=destination, transform=transform) + consumer = queue.get_consumer( + destination=destination, + transform=transform, + monitor=monitor, + ) status = consumer.run(stop_event) return status diff --git a/bizon/monitoring/__init__.py b/bizon/monitoring/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bizon/monitoring/client.py b/bizon/monitoring/client.py new file mode 100644 index 0000000..55f236d --- /dev/null +++ b/bizon/monitoring/client.py @@ -0,0 +1,38 @@ +from datadog import initialize, statsd + +from bizon.common.models import BizonConfig +from bizon.engine.pipeline.models import PipelineReturnStatus + + +class Monitor: + def __init__(self, pipeline_config: BizonConfig): + self.pipeline_config = pipeline_config + + # In Kubernetes, set the host dynamically + initialize( + statsd_host=pipeline_config.monitoring.datadog_agent_host, + statsd_port=pipeline_config.monitoring.datadog_agent_port, + ) + + self.pipeline_monitor_status = "bizon_pipeline.status" + self.tags = [ + f"name:{self.pipeline_config.name}", + f"stream:{self.pipeline_config.source.stream}", + f"source:{self.pipeline_config.source.name}", + f"destination:{self.pipeline_config.destination.name}", + ] + + self.pipeline_active_pipelines = "bizon_pipeline.active_pipelines" + + def report_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None: + """ + Track the status of the pipeline. + + Args: + status (str): The current status of the pipeline (e.g., 'running', 'failed', 'completed'). + """ + + statsd.increment( + self.pipeline_monitor_status, + tags=self.tags + [f"status:{pipeline_status}"], + ) diff --git a/bizon/monitoring/config.py b/bizon/monitoring/config.py new file mode 100644 index 0000000..6571437 --- /dev/null +++ b/bizon/monitoring/config.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class MonitoringConfig(BaseModel): + datadog_agent_host: str + datadog_agent_port: int = 8125 diff --git a/poetry.lock b/poetry.lock index 5ecf5a2..9264bea 100644 --- a/poetry.lock +++ b/poetry.lock @@ -375,6 +375,20 @@ schemaregistry = ["attrs", "cachetools", "httpx"] soaktest = ["opentelemetry-distro", "opentelemetry-exporter-otlp", "psutil"] tests = ["attrs", "avro (>=1.11.1,<2)", "azure-identity", "azure-keyvault-keys", "boto3", "cachetools", "cel-python (>=0.1.5)", "fastavro (<1.8.0)", "fastavro (<2)", "flake8", "google-api-core", "google-auth", "google-cloud-kms", "googleapis-common-protos", "hkdf (==0.0.3)", "httpx", "hvac", "jsonata-python", "jsonschema", "protobuf", "pyrsistent", "pytest", "pytest-timeout", "pytest_cov", "pyyaml (>=6.0.0)", "requests", "requests-mock", "respx", "tink", "urllib3 (<2.0.0)", "urllib3 (>=2.0.0,<3)"] +[[package]] +name = "datadog" +version = "0.50.2" +description = "The Datadog Python library" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "datadog-0.50.2-py2.py3-none-any.whl", hash = "sha256:f3297858564b624efbd9ce43e4ea1c2c21e1f0477ab6d446060b536a1d9e431e"}, + {file = "datadog-0.50.2.tar.gz", hash = "sha256:17725774bf2bb0a48f1d096d92707492c187f24ae08960af0b0c2fa97958fd51"}, +] + +[package.dependencies] +requests = ">=2.6.0" + [[package]] name = "debugpy" version = "1.8.11" @@ -777,17 +791,17 @@ requests = ["requests (>=2.18.0,<3.0.0dev)"] [[package]] name = "googleapis-common-protos" -version = "1.66.0" +version = "1.59.1" description = "Common protobufs used in Google APIs" optional = false python-versions = ">=3.7" files = [ - {file = "googleapis_common_protos-1.66.0-py2.py3-none-any.whl", hash = "sha256:d7abcd75fabb2e0ec9f74466401f6c119a0b498e27370e9be4c94cb7e382b8ed"}, - {file = "googleapis_common_protos-1.66.0.tar.gz", hash = "sha256:c3e7b33d15fdca5374cc0a7346dd92ffa847425cc4ea941d970f13680052ec8c"}, + {file = "googleapis-common-protos-1.59.1.tar.gz", hash = "sha256:b35d530fe825fb4227857bc47ad84c33c809ac96f312e13182bdeaa2abe1178a"}, + {file = "googleapis_common_protos-1.59.1-py2.py3-none-any.whl", hash = "sha256:0cbedb6fb68f1c07e18eb4c48256320777707e7d0c55063ae56c15db3224a61e"}, ] [package.dependencies] -protobuf = ">=3.20.2,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0.dev0" +protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0.dev0" [package.extras] grpc = ["grpcio (>=1.44.0,<2.0.0.dev0)"] @@ -2766,4 +2780,4 @@ rabbitmq = ["pika"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.13" -content-hash = "bc20338717597721f3b4715827a0507eb5b6bc583de17680b4fb6dc826658268" +content-hash = "edbc033f4387c141abda55db33095562a50400d05003d8714334628f8b7c5fd2" diff --git a/pyproject.toml b/pyproject.toml index ab450c0..5fc1875 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,10 @@ snakeviz = "^2.1.2" yappi = "^1.3.2" pre-commit = "^3.8.0" + +[tool.poetry.group.monitoring.dependencies] +datadog = "^0.50.2" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index c9e77d0..6fd5e0e 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.9' - services: db: image: postgres @@ -24,3 +22,23 @@ services: restart: always ports: - 8080:8080 + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.117.0 + volumes: + - ../bizon/monitoring/otel-collector-config.yml:/config.yml + command: --config /config.yml + ports: + - "1888:1888" # pprof extension + - "8888:8888" # Prometheus metrics exposed by the collector + - "8889:8889" # Prometheus exporter metrics + - "13133:13133" # health_check extension + - "4317:4317" # OTLP gRPC receiver + + prometheus: + image: prom/prometheus + ports: + - 9090:9090 + volumes: + - ../bizon/monitoring/prometheus.yml:/etc/prometheus.yml + command: --config.file=/etc/prometheus.yml