Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions contrib/bub-schedule-sqlalchemy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# bub-schedule-sqlalchemy

`bub-schedule-sqlalchemy` is a Bub plugin that persists APScheduler jobs in a SQLAlchemy-backed store.

It targets the Bub deployment shape where:

- scheduled jobs must survive process restarts
- scheduling must work in `bub chat`
- the same scheduler may also be started by a gateway channel at runtime

The package exposes the Bub plugin entry point `schedule` and uses `BackgroundScheduler`, so scheduler startup is not tied to a specific async event loop or to the `schedule` channel being enabled.

## Installation

Install from the monorepo package directory during local development:

```bash
uv add ./contrib/bub-schedule-sqlalchemy
```

Install directly from GitHub:

```bash
uv pip install "git+https://github.com/ob-labs/bubseek.git#subdirectory=contrib/bub-schedule-sqlalchemy"
```

## Configuration

The plugin reads settings from environment variables:

- `BUB_SCHEDULE_SQLALCHEMY_URL`: primary SQLAlchemy database URL for APScheduler jobs
- `BUB_TAPESTORE_SQLALCHEMY_URL`: fallback URL when the schedule-specific URL is unset
- table name defaults to `apscheduler_jobs`

Resolution order:

1. use `BUB_SCHEDULE_SQLALCHEMY_URL` when set
2. otherwise fall back to `BUB_TAPESTORE_SQLALCHEMY_URL`
3. otherwise scheduler creation may fail and the plugin will log a warning and stay disabled

Example:

```bash
export BUB_SCHEDULE_SQLALCHEMY_URL=sqlite:////tmp/bub-schedule.sqlite
```

Or reuse the shared Bub tapestore:

```bash
export BUB_TAPESTORE_SQLALCHEMY_URL=mysql+oceanbase://root:@127.0.0.1:2881/bub
```

## Runtime Behavior

`ScheduleImpl` starts the scheduler lazily from `load_state()`, which runs before tools on every inbound message. This is the key behavior that keeps scheduling usable in `bub chat`: even when only the `cli` channel is active, the scheduler is started and jobs are persisted instead of being left in APScheduler's in-memory pending queue.

When the `schedule` channel is enabled in a gateway runtime, `ScheduleChannel` also starts the same scheduler on channel startup and shuts it down cleanly on channel stop.

If scheduler construction fails, the plugin does not crash the framework:

- `load_state()` logs `Schedule plugin disabled: ...` and returns an empty state
- `provide_channels()` logs the same warning and returns no `schedule` channel

## Test-Covered Behavior

Current tests cover these behaviors:

- SQLAlchemy job store round-trip persistence with SQLite
- `ScheduleImpl.load_state()` starts the injected scheduler
- settings resolution from `BUB_SCHEDULE_SQLALCHEMY_URL`
- fallback from `BUB_TAPESTORE_SQLALCHEMY_URL`
- `schedule.trigger` executes both sync and async jobs
- `schedule.trigger` does not shift an interval job's `next_run_time`

## Limitations

- this package only provides scheduling infrastructure; actual reminder delivery still depends on the surrounding Bub runtime and enabled channels
- session scoping is based on the `session_id` stored in job kwargs
- persistence quality and locking semantics depend on the configured SQLAlchemy backend
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
[project]
name = "bubseek-schedule"
name = "bub-schedule-sqlalchemy"
version = "0.1.0"
description = "Scheduling tools for Bubseek (OceanBase/seekdb job store)"
description = "A standard Bub scheduling plugin backed by APScheduler SQLAlchemy job stores"
authors = [{ name = "Chojan Shang", email = "psiace@apache.org" }]
requires-python = ">=3.12"
dependencies = [
"apscheduler>=3.11.2",
"bub",
"bubseek",
"loguru",
"pydantic-settings>=2.0.0",
"sqlalchemy>=2.0",
]

[tool.uv.sources]
bubseek = { workspace = true }

[project.entry-points."bub"]
schedule = "bubseek_schedule.plugin:main"
[project.entry-points.bub]
schedule = "bub_schedule_sqlalchemy.plugin:main"

[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"

[tool.pdm.build]
package-dir = "src"
includes = ["src/bubseek_schedule"]
includes = ["src/bub_schedule_sqlalchemy"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Reusable Bub scheduling components built on Bub and APScheduler."""

from bub_schedule_sqlalchemy.plugin import ScheduleImpl, build_scheduler

__all__ = ["ScheduleImpl", "build_scheduler"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from collections.abc import Mapping
from typing import Any

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from pydantic import Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class ScheduleSQLAlchemySettings(BaseSettings):
"""Configuration for the APScheduler SQLAlchemy job store."""

model_config = SettingsConfigDict(
env_file=".env",
extra="ignore",
populate_by_name=True,
)

url: str | None = Field(default=None, validation_alias="BUB_SCHEDULE_SQLALCHEMY_URL")
tapestore_url: str | None = Field(default=None, validation_alias="BUB_TAPESTORE_SQLALCHEMY_URL", exclude=True)
tablename: str = "apscheduler_jobs"

@model_validator(mode="after")
def inherit_tapestore_url(self) -> ScheduleSQLAlchemySettings:
if self.url is None and self.tapestore_url:
self.url = self.tapestore_url
return self


def build_sqlalchemy_jobstore(
*,
settings: ScheduleSQLAlchemySettings,
engine_options: Mapping[str, Any] | None = None,
) -> SQLAlchemyJobStore:
return SQLAlchemyJobStore(
url=settings.url,
tablename=settings.tablename,
engine_options=dict(engine_options or {}),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import contextlib
from collections.abc import Callable, Mapping
from typing import Any

from apscheduler.jobstores.base import BaseJobStore
from apscheduler.schedulers import SchedulerAlreadyRunningError
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.base import BaseScheduler
from bub import hookimpl
from bub.types import Envelope, MessageHandler, State
from loguru import logger

from bub_schedule_sqlalchemy.job_store import ScheduleSQLAlchemySettings, build_sqlalchemy_jobstore

SchedulerFactory = Callable[[], BaseScheduler]


def build_scheduler(*, jobstore: BaseJobStore, jobstore_alias: str = "default") -> BaseScheduler:
"""Build a background scheduler with an injected job store."""
return BackgroundScheduler(jobstores={jobstore_alias: jobstore})


def build_sqlalchemy_scheduler(
*,
settings: ScheduleSQLAlchemySettings,
engine_options: Mapping[str, Any] | None = None,
jobstore_alias: str = "default",
) -> BaseScheduler:
jobstore = build_sqlalchemy_jobstore(settings=settings, engine_options=engine_options)
return build_scheduler(jobstore=jobstore, jobstore_alias=jobstore_alias)


def _default_scheduler() -> BaseScheduler:
return build_sqlalchemy_scheduler(settings=ScheduleSQLAlchemySettings())


class ScheduleImpl:
"""Schedule plugin backed by an injected APScheduler scheduler.

Uses BackgroundScheduler so the scheduler can start without the ``schedule`` channel.
``bub chat`` only enables the ``cli`` channel; previously AsyncIOScheduler never started,
so APScheduler kept jobs in memory-only ``_pending_jobs`` and nothing reached the DB.
"""

def __init__(self, scheduler_factory: SchedulerFactory) -> None:
from bub_schedule_sqlalchemy import tools # noqa: F401

self._scheduler_factory = scheduler_factory
self._scheduler: BaseScheduler | None = None

@classmethod
def from_scheduler(cls, scheduler: BaseScheduler) -> "ScheduleImpl":
return cls(lambda: scheduler)

@property
def scheduler(self) -> BaseScheduler:
if self._scheduler is None:
self._scheduler = self._scheduler_factory()
return self._scheduler

def _ensure_scheduler_started(self) -> BaseScheduler:
scheduler = self.scheduler
if scheduler.running:
return scheduler
with contextlib.suppress(SchedulerAlreadyRunningError):
scheduler.start()
return scheduler

@hookimpl
def load_state(self, message: Envelope, session_id: str) -> State:
# Runs before tools on every inbound message — covers CLI-only ``bub chat``.
try:
scheduler = self._ensure_scheduler_started()
except Exception as exc:
logger.warning(f"Schedule plugin disabled: {exc}")
return {}
return {"scheduler": scheduler}

@hookimpl
def provide_channels(self, message_handler: MessageHandler) -> list:
from bub_schedule_sqlalchemy.channel import ScheduleChannel

try:
scheduler = self.scheduler
except Exception as exc:
logger.warning(f"Schedule plugin disabled: {exc}")
return []
return [ScheduleChannel(scheduler)]


main = ScheduleImpl(_default_scheduler)
Empty file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import inspect
import uuid
from datetime import UTC, datetime, timedelta
from typing import cast

from apscheduler.job import Job
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError
from apscheduler.schedulers.base import BaseScheduler
from apscheduler.triggers.cron import CronTrigger
Expand All @@ -11,15 +13,37 @@
from pydantic import BaseModel, Field
from republic import ToolContext

from bubseek_schedule.jobs import run_scheduled_reminder
from bub_schedule_sqlalchemy.jobs import run_scheduled_reminder

MISSING_SCHEDULER_MESSAGE = "scheduler not found in state, is ScheduleImpl plugin loaded?"
MISSING_TRIGGER_ARGUMENTS_MESSAGE = "One of after_seconds, interval_seconds, or cron must be set"


def _ensure_scheduler(state: dict) -> BaseScheduler:
if "scheduler" not in state:
raise RuntimeError("scheduler not found in state, is ScheduleImpl plugin loaded?")
raise RuntimeError(MISSING_SCHEDULER_MESSAGE)
return cast(BaseScheduler, state["scheduler"])


def _format_next_run(next_run_time: object) -> str:
if isinstance(next_run_time, datetime):
return next_run_time.isoformat()
return "-"


def _get_job_or_raise(scheduler: BaseScheduler, job_id: str) -> Job:
job = scheduler.get_job(job_id)
if job is None:
raise RuntimeError(f"job not found: {job_id}")
return job


async def _run_job_now(job: Job) -> None:
result = job.func(*(job.args or ()), **(job.kwargs or {}))
if inspect.isawaitable(result):
await result


class ScheduleAddInput(BaseModel):
after_seconds: int | None = Field(None, description="If set, schedule to run after this many seconds from now")
interval_seconds: int | None = Field(None, description="If set, repeat at this interval")
Expand Down Expand Up @@ -47,7 +71,7 @@ def schedule_add(params: ScheduleAddInput, context: ToolContext) -> str:
except ValueError as exc:
raise RuntimeError(f"invalid cron expression: {params.cron}") from exc
else:
raise RuntimeError("One of after_seconds, interval_seconds, or cron must be set")
raise RuntimeError(MISSING_TRIGGER_ARGUMENTS_MESSAGE)
scheduler = _ensure_scheduler(context.state)
workspace = context.state.get("_runtime_workspace")
try:
Expand All @@ -66,11 +90,7 @@ def schedule_add(params: ScheduleAddInput, context: ToolContext) -> str:
except ConflictingIdError as exc:
raise RuntimeError(f"job id already exists: {job_id}") from exc

next_run = "-"
nrt = getattr(job, "next_run_time", None)
if isinstance(nrt, datetime):
next_run = nrt.isoformat()
return f"scheduled: {job.id} next={next_run}"
return f"scheduled: {job.id} next={_format_next_run(getattr(job, 'next_run_time', None))}"


@tool(name="schedule.remove", context=True)
Expand All @@ -91,17 +111,23 @@ def schedule_list(context: ToolContext) -> str:
jobs = scheduler.get_jobs()
rows: list[str] = []
for job in jobs:
next_run = "-"
nrt = getattr(job, "next_run_time", None)
if isinstance(nrt, datetime):
next_run = nrt.isoformat()
message = str(job.kwargs.get("message", ""))
job_session = job.kwargs.get("session_id")
if job_session and job_session != context.state.get("session_id", ""):
continue
rows.append(f"{job.id} next={next_run} msg={message}")
rows.append(f"{job.id} next={_format_next_run(getattr(job, 'next_run_time', None))} msg={message}")

if not rows:
return "(no scheduled jobs)"

return "\n".join(rows)


@tool(name="schedule.trigger", context=True)
async def schedule_trigger(job_id: str, context: ToolContext) -> str:
"""Run an existing scheduled job immediately without changing its schedule."""
scheduler = _ensure_scheduler(context.state)
job = _get_job_or_raise(scheduler, job_id)
await _run_job_now(job)

return f"triggered: {job_id} (next scheduled run: {_format_next_run(job.next_run_time)})"
Loading
Loading