diff --git a/bizon/engine/pipeline/consumer.py b/bizon/engine/pipeline/consumer.py index a0c310c..d578cd0 100644 --- a/bizon/engine/pipeline/consumer.py +++ b/bizon/engine/pipeline/consumer.py @@ -15,6 +15,7 @@ QueueMessage, ) from bizon.monitoring.monitor import AbstractMonitor +from bizon.source.callback import AbstractSourceCallback from bizon.transform.transform import Transform @@ -25,11 +26,13 @@ def __init__( destination: AbstractDestination, transform: Transform, monitor: AbstractMonitor, + source_callback: AbstractSourceCallback, ): self.config = config self.destination = destination self.transform = transform self.monitor = monitor + self.source_callback = source_callback @abstractmethod def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus: diff --git a/bizon/engine/queue/adapters/python_queue/consumer.py b/bizon/engine/queue/adapters/python_queue/consumer.py index 9759e4c..4dbf556 100644 --- a/bizon/engine/queue/adapters/python_queue/consumer.py +++ b/bizon/engine/queue/adapters/python_queue/consumer.py @@ -11,6 +11,7 @@ from bizon.engine.queue.config import QueueMessage from bizon.engine.queue.queue import AbstractQueue from bizon.monitoring.monitor import AbstractMonitor +from bizon.source.callback import AbstractSourceCallback from bizon.transform.transform import Transform from .config import PythonQueueConfig @@ -24,12 +25,14 @@ def __init__( destination: AbstractDestination, transform: Transform, monitor: AbstractMonitor, + source_callback: AbstractSourceCallback, ): super().__init__( config, destination=destination, transform=transform, monitor=monitor, + source_callback=source_callback, ) self.queue = queue self.monitor.track_pipeline_status(PipelineReturnStatus.RUNNING) diff --git a/bizon/engine/queue/adapters/python_queue/queue.py b/bizon/engine/queue/adapters/python_queue/queue.py index d605af0..5d4460f 100644 --- a/bizon/engine/queue/adapters/python_queue/queue.py +++ b/bizon/engine/queue/adapters/python_queue/queue.py @@ -9,6 +9,7 @@ from bizon.engine.queue.config import QUEUE_TERMINATION, QueueMessage from bizon.engine.queue.queue import AbstractQueue, AbstractQueueConsumer from bizon.monitoring.monitor import AbstractMonitor +from bizon.source.callback import AbstractSourceCallback from bizon.source.models import SourceIteration from bizon.transform.transform import Transform @@ -30,10 +31,19 @@ def connect(self): pass def get_consumer( - self, destination: AbstractDestination, transform: Transform, monitor: AbstractMonitor + self, + destination: AbstractDestination, + transform: Transform, + monitor: AbstractMonitor, + source_callback: AbstractSourceCallback, ) -> AbstractQueueConsumer: return PythonQueueConsumer( - config=self.config, queue=self.queue, destination=destination, transform=transform, monitor=monitor + config=self.config, + queue=self.queue, + destination=destination, + transform=transform, + monitor=monitor, + source_callback=source_callback, ) def put_queue_message(self, queue_message: QueueMessage): diff --git a/bizon/engine/runner/adapters/process.py b/bizon/engine/runner/adapters/process.py index e2ad774..b3a6429 100644 --- a/bizon/engine/runner/adapters/process.py +++ b/bizon/engine/runner/adapters/process.py @@ -51,6 +51,7 @@ def run(self): future_consumer = executor.submit( AbstractRunner.instanciate_and_run_consumer, self.bizon_config, + self.config, job.id, **extra_kwargs, ) diff --git a/bizon/engine/runner/adapters/thread.py b/bizon/engine/runner/adapters/thread.py index a7330c8..41f1521 100644 --- a/bizon/engine/runner/adapters/thread.py +++ b/bizon/engine/runner/adapters/thread.py @@ -62,6 +62,7 @@ def run(self) -> RunnerStatus: future_consumer = executor.submit( AbstractRunner.instanciate_and_run_consumer, self.bizon_config, + self.config, job.id, consumer_stop_event, **extra_kwargs, diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index 92c62d5..cf24185 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -211,10 +211,16 @@ def instanciate_and_run_producer( **kwargs, ): + # Get the source instance source = AbstractRunner.get_source(bizon_config=bizon_config, config=config) + + # Get the queue instance queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs) + + # Get the backend instance backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs) + # Create the producer instance producer = AbstractRunner.get_producer( bizon_config=bizon_config, source=source, @@ -222,29 +228,45 @@ def instanciate_and_run_producer( backend=backend, ) + # Run the producer status = producer.run(job_id, stop_event) return status @staticmethod def instanciate_and_run_consumer( bizon_config: BizonConfig, + config: dict, job_id: str, stop_event: Union[multiprocessing.synchronize.Event, threading.Event], **kwargs, ): + # Get the source callback instance + source_callback = AbstractRunner.get_source(bizon_config=bizon_config, config=config).get_source_callback_instance() + # Get the queue instance queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs) + + # Get the backend instance backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs) + + # Get the destination instance destination = AbstractRunner.get_destination(bizon_config=bizon_config, backend=backend, job_id=job_id) + + # Get the transform instance transform = AbstractRunner.get_transform(bizon_config=bizon_config) + + # Get the monitor instance monitor = AbstractRunner.get_monitoring_client(bizon_config=bizon_config) + # Create the consumer instance consumer = queue.get_consumer( destination=destination, transform=transform, monitor=monitor, + source_callback=source_callback, ) + # Run the consumer status = consumer.run(stop_event) return status diff --git a/bizon/source/callback.py b/bizon/source/callback.py new file mode 100644 index 0000000..c2cb729 --- /dev/null +++ b/bizon/source/callback.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod +from typing import List + +from bizon.source.config import SourceConfig +from bizon.source.models import SourceIteration + +class AbstractSourceCallback(ABC): + @abstractmethod + def __init__(self, config: SourceConfig): + pass + + @abstractmethod + def on_iterations_written(self, iterations: List[SourceIteration]): + pass + + +class NoOpSourceCallback(AbstractSourceCallback): + def __init__(self, config: SourceConfig): + pass + + def on_iterations_written(self, iterations: List[SourceIteration]): + """No-op callback""" + pass diff --git a/bizon/source/source.py b/bizon/source/source.py index 8c44e7f..fe0d5d5 100644 --- a/bizon/source/source.py +++ b/bizon/source/source.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from datetime import datetime from typing import Any, List, Optional, Tuple, Type, Union from requests.auth import AuthBase @@ -7,6 +6,7 @@ from .config import SourceConfig from .models import SourceIncrementalState, SourceIteration from .session import Session +from .callback import AbstractSourceCallback, NoOpSourceCallback class AbstractSource(ABC): @@ -36,6 +36,10 @@ def get_config_class() -> Type[SourceConfig]: """Return the config class for the source""" pass + def get_source_callback_instance(self) -> AbstractSourceCallback: + """Return an instance of the source callback""" + return NoOpSourceCallback(config=self.config) + @abstractmethod def get_authenticator(self) -> Union[AuthBase, None]: """Return an authenticator for the source, it will be set in the session