Skip to content

Commit

Permalink
feat: implement source callback
Browse files Browse the repository at this point in the history
  • Loading branch information
aballiet committed Jan 29, 2025
1 parent d9d7fc5 commit b85b497
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 3 deletions.
3 changes: 3 additions & 0 deletions bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
QueueMessage,
)
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.callback import AbstractSourceCallback
from bizon.transform.transform import Transform


Expand All @@ -25,11 +26,13 @@ def __init__(
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
source_callback: AbstractSourceCallback,
):
self.config = config
self.destination = destination
self.transform = transform
self.monitor = monitor
self.source_callback = source_callback

@abstractmethod
def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus:
Expand Down
3 changes: 3 additions & 0 deletions bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from bizon.engine.queue.config import QueueMessage
from bizon.engine.queue.queue import AbstractQueue
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.callback import AbstractSourceCallback
from bizon.transform.transform import Transform

from .config import PythonQueueConfig
Expand All @@ -24,12 +25,14 @@ def __init__(
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
source_callback: AbstractSourceCallback,
):
super().__init__(
config,
destination=destination,
transform=transform,
monitor=monitor,
source_callback=source_callback,
)
self.queue = queue
self.monitor.track_pipeline_status(PipelineReturnStatus.RUNNING)
Expand Down
14 changes: 12 additions & 2 deletions bizon/engine/queue/adapters/python_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from bizon.engine.queue.config import QUEUE_TERMINATION, QueueMessage
from bizon.engine.queue.queue import AbstractQueue, AbstractQueueConsumer
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.callback import AbstractSourceCallback
from bizon.source.models import SourceIteration
from bizon.transform.transform import Transform

Expand All @@ -30,10 +31,19 @@ def connect(self):
pass

def get_consumer(
self, destination: AbstractDestination, transform: Transform, monitor: AbstractMonitor
self,
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
source_callback: AbstractSourceCallback,
) -> AbstractQueueConsumer:
return PythonQueueConsumer(
config=self.config, queue=self.queue, destination=destination, transform=transform, monitor=monitor
config=self.config,
queue=self.queue,
destination=destination,
transform=transform,
monitor=monitor,
source_callback=source_callback,
)

def put_queue_message(self, queue_message: QueueMessage):
Expand Down
1 change: 1 addition & 0 deletions bizon/engine/runner/adapters/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run(self):
future_consumer = executor.submit(
AbstractRunner.instanciate_and_run_consumer,
self.bizon_config,
self.config,
job.id,
**extra_kwargs,
)
Expand Down
1 change: 1 addition & 0 deletions bizon/engine/runner/adapters/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def run(self) -> RunnerStatus:
future_consumer = executor.submit(
AbstractRunner.instanciate_and_run_consumer,
self.bizon_config,
self.config,
job.id,
consumer_stop_event,
**extra_kwargs,
Expand Down
22 changes: 22 additions & 0 deletions bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,40 +211,62 @@ def instanciate_and_run_producer(
**kwargs,
):

# Get the source instance
source = AbstractRunner.get_source(bizon_config=bizon_config, config=config)

# Get the queue instance
queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs)

# Get the backend instance
backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs)

# Create the producer instance
producer = AbstractRunner.get_producer(
bizon_config=bizon_config,
source=source,
queue=queue,
backend=backend,
)

# Run the producer
status = producer.run(job_id, stop_event)
return status

@staticmethod
def instanciate_and_run_consumer(
bizon_config: BizonConfig,
config: dict,
job_id: str,
stop_event: Union[multiprocessing.synchronize.Event, threading.Event],
**kwargs,
):
# Get the source callback instance
source_callback = AbstractRunner.get_source(bizon_config=bizon_config, config=config).get_source_callback_instance()

# Get the queue instance
queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs)

# Get the backend instance
backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs)

# Get the destination instance
destination = AbstractRunner.get_destination(bizon_config=bizon_config, backend=backend, job_id=job_id)

# Get the transform instance
transform = AbstractRunner.get_transform(bizon_config=bizon_config)

# Get the monitor instance
monitor = AbstractRunner.get_monitoring_client(bizon_config=bizon_config)

# Create the consumer instance
consumer = queue.get_consumer(
destination=destination,
transform=transform,
monitor=monitor,
source_callback=source_callback,
)

# Run the consumer
status = consumer.run(stop_event)
return status

Expand Down
23 changes: 23 additions & 0 deletions bizon/source/callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from typing import List

from bizon.source.config import SourceConfig
from bizon.source.models import SourceIteration

class AbstractSourceCallback(ABC):
@abstractmethod
def __init__(self, config: SourceConfig):
pass

@abstractmethod
def on_iterations_written(self, iterations: List[SourceIteration]):
pass


class NoOpSourceCallback(AbstractSourceCallback):
def __init__(self, config: SourceConfig):
pass

def on_iterations_written(self, iterations: List[SourceIteration]):
"""No-op callback"""
pass
6 changes: 5 additions & 1 deletion bizon/source/source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, List, Optional, Tuple, Type, Union

from requests.auth import AuthBase

from .config import SourceConfig
from .models import SourceIncrementalState, SourceIteration
from .session import Session
from .callback import AbstractSourceCallback, NoOpSourceCallback


class AbstractSource(ABC):
Expand Down Expand Up @@ -36,6 +36,10 @@ def get_config_class() -> Type[SourceConfig]:
"""Return the config class for the source"""
pass

def get_source_callback_instance(self) -> AbstractSourceCallback:
"""Return an instance of the source callback"""
return NoOpSourceCallback(config=self.config)

@abstractmethod
def get_authenticator(self) -> Union[AuthBase, None]:
"""Return an authenticator for the source, it will be set in the session
Expand Down

0 comments on commit b85b497

Please sign in to comment.