Skip to content

msinamsina/AI-Graph

Repository files navigation

AI-Graph

Python Version License: GPL v3 Code style: black Tests Coverage Development Status

A powerful and flexible AI Graph framework for building processing pipelines using the Chain of Responsibility pattern.

πŸš€ Features

  • πŸ”— Pipeline Architecture: Build complex processing pipelines using chained steps
  • πŸ”„ ForEach Processing: Iterate over collections or run fixed iterations with sub-pipelines
  • πŸ—οΈ Modular Design: Easily extensible with custom pipeline steps
  • πŸ“Š Progress Tracking: Built-in progress bars with tqdm integration
  • πŸ§ͺ 100% Test Coverage: Comprehensive test suite with pytest
  • 🎯 Type Safe: Full type hints support with mypy
  • πŸ“¦ Modern Python: Built with modern Python packaging standards

πŸ“‹ Table of Contents

πŸ’Ύ Installation

Using pip

pip install ai-graph

Using uv (recommended)

uv add ai-graph

Development Installation

git clone https://github.com/ai-graph/ai-graph.git
cd ai-graph
uv sync --group dev

⚑ Quick Start

Here's a simple example to get you started:

from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep

# Create a simple pipeline
pipeline = Pipeline("DataProcessor")

# Add processing steps
pipeline.add_step(AddKeyStep("status", "processing"))
pipeline.add_step(AddKeyStep("timestamp", "2024-01-01"))

# Process data
data = {"input": "some data"}
result = pipeline.process(data)

print(result)
# Output: {'input': 'some data', 'status': 'processing', 'timestamp': '2024-01-01'}

🧩 Core Concepts

Pipeline Steps

Pipeline steps are the building blocks of your processing pipeline. Each step implements the BasePipelineStep interface:

from ai_graph.step.base import BasePipelineStep

class CustomStep(BasePipelineStep):
    def _process_step(self, data):
        # Your processing logic here
        data["custom_field"] = "processed"
        return data

# Use in pipeline
pipeline = Pipeline("CustomPipeline")
pipeline.add_step(CustomStep("MyCustomStep"))

Built-in Steps

  • AddKeyStep: Adds a key-value pair to the data
  • DelKeyStep: Removes a key from the data
  • ForEachStep: Processes collections or runs iterations

Pipelines

Pipelines manage the execution flow of your processing steps:

from ai_graph.pipeline.base import Pipeline

# Create pipeline
pipeline = Pipeline("MyPipeline")

# Add steps (method chaining supported)
pipeline.add_step(step1).add_step(step2).add_step(step3)

# Process data
result = pipeline.process(input_data)

ForEach Processing

Process collections or run fixed iterations with sub-pipelines:

from ai_graph.step.foreach import ForEachStep

# Process a list of items
foreach_step = ForEachStep(
    items_key="items",
    results_key="processed_items"
)

# Add sub-processing steps
foreach_step.add_sub_step(AddKeyStep("processed", True))
foreach_step.add_sub_step(AddKeyStep("batch_id", "batch_001"))

# Use in pipeline
pipeline = Pipeline("BatchProcessor")
pipeline.add_step(foreach_step)

data = {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}
result = pipeline.process(data)

πŸ“š Examples

Example 1: Data Validation Pipeline

from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import AddKeyStep

class ValidateDataStep(BasePipelineStep):
    def _process_step(self, data):
        if "required_field" not in data:
            data["validation_error"] = "Missing required field"
        else:
            data["validation_status"] = "valid"
        return data

# Create validation pipeline
pipeline = Pipeline("DataValidator")
pipeline.add_step(ValidateDataStep("Validator"))
pipeline.add_step(AddKeyStep("validated_at", "2024-01-01"))

# Process data
data = {"required_field": "value"}
result = pipeline.process(data)

Example 2: Batch Processing with ForEach

from ai_graph.step.foreach import ForEachStep

class ProcessItemStep(BasePipelineStep):
    def _process_step(self, data):
        # Access current item and iteration index
        current_item = data["_current_item"]
        iteration_index = data["_iteration_index"]

        # Process the item
        data["processed_value"] = current_item * 2
        data["position"] = iteration_index

        return data

# Create batch processing pipeline
batch_processor = ForEachStep(
    items_key="numbers",
    results_key="processed_numbers"
)
batch_processor.add_sub_step(ProcessItemStep("ItemProcessor"))

pipeline = Pipeline("BatchProcessor")
pipeline.add_step(batch_processor)

# Process batch
data = {"numbers": [1, 2, 3, 4, 5]}
result = pipeline.process(data)
# result["processed_numbers"] will contain processed items

Example 3: Fixed Iterations

# Run a fixed number of iterations
iteration_step = ForEachStep(
    iterations=5,
    results_key="iteration_results"
)

class IterationStep(BasePipelineStep):
    def _process_step(self, data):
        iteration = data["_iteration_index"]
        data["step"] = f"Step {iteration + 1}"
        return data

iteration_step.add_sub_step(IterationStep("Iterator"))

pipeline = Pipeline("IterationPipeline")
pipeline.add_step(iteration_step)

result = pipeline.process({"start": True})

πŸ“– API Reference

BasePipelineStep

class BasePipelineStep:
    def __init__(self, name: str = None)
    def set_next(self, step: 'BasePipelineStep') -> None
    def process(self, data: Dict[str, Any]) -> Dict[str, Any]
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]  # Override this

Pipeline

class Pipeline:
    def __init__(self, name: str, first_step: Optional[PipelineStep] = None)
    def add_step(self, step: PipelineStep) -> "Pipeline"
    def process(self, data: Dict[str, Any]) -> Dict[str, Any]

ForEachStep

class ForEachStep(BasePipelineStep):
    def __init__(
        self,
        items_key: Optional[str] = None,
        iterations: Optional[int] = None,
        results_key: str = "foreach_results",
        name: str = None
    )
    def add_sub_step(self, step: BasePipelineStep) -> "ForEachStep"

πŸ› οΈ Development

Prerequisites

  • Python 3.8+
  • uv (recommended) or pip

Setup Development Environment

# Clone repository
git clone https://github.com/ai-graph/ai-graph.git
cd ai-graph

# Install dependencies
uv sync --group dev

# Install pre-commit hooks (optional)
uv run pre-commit install

Running Tests

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov

# Run specific test file
uv run pytest tests/step/test_base.py

Code Quality

# Format code
uv run black .
uv run isort .

# Lint code
uv run flake8 .

# Type checking
uv run mypy .

Making Releases

This project uses conventional commits and automated versioning:

# Make changes and commit using conventional commits
uv run cz commit

# Bump version automatically
uv run cz bump

# Push changes and tags
git push origin main --tags

🀝 Contributing

We love contributions! Please see our Contributing Guidelines for details.

Quick Contribution Guide

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Make your changes and add tests
  4. Ensure tests pass: uv run pytest
  5. Format code: uv run black . && uv run isort .
  6. Commit using conventional commits: uv run cz commit
  7. Push and create a Pull Request

πŸ“„ License

This project is licensed under the GNU General Public License v3.0 - see the LICENSE file for details.

πŸ™‹β€β™‚οΈ Support

πŸŽ‰ Acknowledgments

  • Built with modern Python packaging standards
  • Inspired by the Chain of Responsibility design pattern
  • Uses tqdm for progress bars
  • Tested with pytest

⭐ Star this repository if you find it helpful!

Made with ❀️ by Mohammad Sina Allahkaram