Skip to content

feat(extensions): add event-driven task scheduler#1

Open
pandasdroid wants to merge 1 commit into
openJiuwen-ai:mainfrom
pandasdroid:feat/event-driven-scheduler
Open

feat(extensions): add event-driven task scheduler#1
pandasdroid wants to merge 1 commit into
openJiuwen-ai:mainfrom
pandasdroid:feat/event-driven-scheduler

Conversation

@pandasdroid
Copy link
Copy Markdown

Summary

Adds a new event-driven task scheduler extension that provides three capabilities currently missing from the controller task system:

  • Delayed task execution — Schedule tasks for future execution with configurable delays via TimerDispatcher
  • Automatic task chaining — Define declarative rules that automatically create downstream tasks when a source task completes via EventChainHandler
  • Retry with exponential backoff — Automatically retry failed tasks with configurable backoff policy via RetryHandler

All scheduling metadata is stored in Task.extensions['event_scheduler'], making this fully non-invasive — no modifications to the core Task schema.

Architecture

extensions/event_scheduler/
├── schema/scheduler_schema.py    # ScheduleType, RetryPolicy, EventChainRule, ScheduledTaskMixin, Config
├── core/
│   ├── timer_dispatcher.py       # Delayed task execution with background polling
│   ├── event_chain.py            # Completion-triggered task chaining
│   └── retry_handler.py          # Failed task retry with exponential backoff
├── service/
│   └── event_driven_scheduler.py # Main orchestrator integrating all components
└── __init__.py                   # Public API exports

Usage Example

from openjiuwen.extensions.event_scheduler import (
    EventDrivenScheduler,
    EventSchedulerConfig,
    EventChainRule,
    ScheduledTaskMixin,
    ScheduleType,
    RetryPolicy,
)

# Configure chain rules and retry policy
config = EventSchedulerConfig(
    chain_rules=[
        EventChainRule(
            rule_id="extract-then-validate",
            source_task_type="data_extraction",
            target_task_type="data_validation",
        )
    ],
    default_retry_policy=RetryPolicy(max_retries=3, base_delay=2.0),
)

scheduler = EventDrivenScheduler(config, task_manager, event_queue)
await scheduler.start()

# Submit a delayed task
await scheduler.submit_scheduled_task(
    session_id="session-1",
    task_type="data_extraction",
    mixin=ScheduledTaskMixin(schedule_type=ScheduleType.DELAYED, delay_seconds=30.0),
)

Test Plan

  • 52 unit tests covering all components (all passing)
  • Schema validation tests (RetryPolicy, EventChainRule, ScheduledTaskMixin, Config)
  • TimerDispatcher tests (scheduling, cancellation, limits, lifecycle)
  • EventChainHandler tests (chaining, conditions, dynamic rules)
  • RetryHandler tests (retry, backoff, exhaustion, fallback policy)
  • EventDrivenScheduler integration tests (submission, events, lifecycle)

Add a non-invasive extension that brings event-driven scheduling
capabilities to the controller task system:

- Time-based delayed task execution via TimerDispatcher
- Automatic task chaining on completion via EventChainHandler
- Failed task retry with exponential backoff via RetryHandler
- EventDrivenScheduler service orchestrating all components

Stores scheduling metadata in Task.extensions['event_scheduler']
without modifying the core Task schema. Includes 52 unit tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant