Skip to content

RFC-006: State Checkpointing for Long-Running Pipelines #8

@pontino

Description

@pontino

Summary

Add state checkpointing capabilities to eggai-clutch, enabling save and resume functionality for long-running agent pipelines.

Motivation

Current limitations of in-memory-only state:

  • Failure Recovery: Process crashes lose all pipeline progress
  • Deployment Flexibility: Cannot handle process restarts gracefully
  • Resource Management: Large pipelines hold state indefinitely
  • Debugging: No historical record for post-mortem analysis

Proposed API

Clutch Configuration

from eggai_clutch import Clutch, Strategy
from eggai_clutch.checkpoint import FileCheckpointStorage, CheckpointPolicy

clutch = Clutch(
    "data_enrichment",
    strategy=Strategy.SEQUENTIAL,
    checkpoint=CheckpointConfig(
        storage=FileCheckpointStorage(Path("./checkpoints")),
        policy=CheckpointPolicy(every_n_steps=1, keep_latest=3),
        run_id="pipeline-xyz",
    ),
)

Execution API

# Normal execution with automatic checkpointing
result = await clutch.run(input_data)

# Resume from checkpoint
result = await clutch.resume(checkpoint_id="abc-123")

# Resume latest checkpoint
result = await clutch.resume_latest()

# Manual checkpoint
checkpoint_id = await clutch.checkpoint()

Storage Backends

class FileCheckpointStorage(CheckpointStorage): ...
class RedisCheckpointStorage(CheckpointStorage): ...
class SQLCheckpointStorage(CheckpointStorage): ...

Checkpoint Policy

@dataclass
class CheckpointPolicy:
    every_n_steps: int | None = 1
    on_agent_complete: list[str] | None = None
    keep_latest: int = 3
    ttl: timedelta | None = None

Implementation Scope

Phase 1: Core Infrastructure

  • Checkpoint data model leveraging ClutchContext/ClutchState
  • CheckpointStorage interface
  • FileCheckpointStorage implementation

Phase 2: Integration

  • Integrate checkpointing into all strategies
  • resume() and resume_latest() methods
  • run_id for persistent identification

Phase 3: Storage Backends

  • RedisCheckpointStorage
  • SQLCheckpointStorage

Phase 4: Advanced Features

  • Automatic cleanup
  • Checkpoint events via on_step hook

Dependencies

Reference

See eggai-clutch-rfcs/RFC-006-state-checkpointing.md for full specification.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions