Skip to content

Latest commit

 

History

History
197 lines (143 loc) · 3.71 KB

File metadata and controls

197 lines (143 loc) · 3.71 KB

Pipeline Forge API Reference

Core Components

Pipeline Class

from pipeline_forge.core import Pipeline

# Create a new pipeline
pipeline = Pipeline(name="my_pipeline", config_path="pipeline.yaml")

# Execute the pipeline
result = pipeline.run()

Methods

  • __init__(name: str, config_path: str) - Initialize a new pipeline
  • run() -> PipelineResult - Execute the pipeline
  • validate() -> bool - Validate pipeline configuration
  • get_status() -> PipelineStatus - Get current execution status
  • stop() - Stop pipeline execution

Task Class

from pipeline_forge.core import Task

# Create a custom task
task = Task(
    name="process_data",
    command="python process.py",
    dependencies=["load_data"],
    retry_config={
        "max_attempts": 3,
        "backoff_factor": 2.0
    }
)

Task Configuration

  • name: str - Unique task identifier
  • command: str - Shell command to execute
  • dependencies: List[str] - List of dependency task names
  • timeout: int - Task timeout in seconds
  • retry_config: Dict - Retry configuration
  • environment: Dict - Environment variables

Dependency Resolver

from pipeline_forge.core import DependencyResolver

resolver = DependencyResolver()
execution_order = resolver.resolve(tasks)

Dashboard Components

Terminal Dashboard

from pipeline_forge.dashboard import TerminalDashboard

dashboard = TerminalDashboard()
dashboard.start(pipeline)

Features

  • Real-time pipeline status visualization
  • Task execution progress bars
  • Resource utilization metrics
  • Error and warning displays

Metrics Collector

from pipeline_forge.dashboard import MetricsCollector

collector = MetricsCollector()
collector.record_task_start(task_name)
collector.record_task_completion(task_name, duration)

Plugin System

Creating Custom Plugins

from pipeline_forge.plugins import BasePlugin

class CustomTransformPlugin(BasePlugin):
    def __init__(self):
        super().__init__("custom_transform")
    
    def execute(self, data):
        # Transform data
        return transformed_data
    
    def validate_config(self, config):
        # Validate plugin configuration
        return True

Plugin Registry

from pipeline_forge.plugins import PluginRegistry

registry = PluginRegistry()
registry.register(CustomTransformPlugin())
plugin = registry.get_plugin("custom_transform")

Configuration

YAML Schema

name: example_pipeline
version: "1.0"

tasks:
  - name: extract_data
    command: "python extract.py"
    timeout: 300
    
  - name: transform_data
    command: "python transform.py"
    dependencies: ["extract_data"]
    retry:
      max_attempts: 3
      backoff_factor: 1.5
    
  - name: load_data
    command: "python load.py"
    dependencies: ["transform_data"]

settings:
  parallel_tasks: 4
  log_level: INFO
  dashboard_enabled: true

Distributed Execution

Node Configuration

from pipeline_forge.distributed import Node

node = Node(
    node_id="worker-1",
    coordinator_host="localhost",
    coordinator_port=8080
)
node.start()

Scheduler

from pipeline_forge.distributed import Scheduler

scheduler = Scheduler()
scheduler.distribute_tasks(tasks, available_nodes)

Error Handling

Retry Mechanisms

from pipeline_forge.core import RetryPolicy

retry_policy = RetryPolicy(
    max_attempts=5,
    backoff_factor=2.0,
    max_delay=300,
    retry_exceptions=[ConnectionError, TimeoutError]
)

Circuit Breaker

from pipeline_forge.core import CircuitBreaker

circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=60,
    expected_exception=ConnectionError
)