diff --git a/contrib/bub-schedule-sqlalchemy/README.md b/contrib/bub-schedule-sqlalchemy/README.md new file mode 100644 index 0000000..2d39cd0 --- /dev/null +++ b/contrib/bub-schedule-sqlalchemy/README.md @@ -0,0 +1,79 @@ +# bub-schedule-sqlalchemy + +`bub-schedule-sqlalchemy` is a Bub plugin that persists APScheduler jobs in a SQLAlchemy-backed store. + +It targets the Bub deployment shape where: + +- scheduled jobs must survive process restarts +- scheduling must work in `bub chat` +- the same scheduler may also be started by a gateway channel at runtime + +The package exposes the Bub plugin entry point `schedule` and uses `BackgroundScheduler`, so scheduler startup is not tied to a specific async event loop or to the `schedule` channel being enabled. + +## Installation + +Install from the monorepo package directory during local development: + +```bash +uv add ./contrib/bub-schedule-sqlalchemy +``` + +Install directly from GitHub: + +```bash +uv pip install "git+https://github.com/ob-labs/bubseek.git#subdirectory=contrib/bub-schedule-sqlalchemy" +``` + +## Configuration + +The plugin reads settings from environment variables: + +- `BUB_SCHEDULE_SQLALCHEMY_URL`: primary SQLAlchemy database URL for APScheduler jobs +- `BUB_TAPESTORE_SQLALCHEMY_URL`: fallback URL when the schedule-specific URL is unset +- table name defaults to `apscheduler_jobs` + +Resolution order: + +1. use `BUB_SCHEDULE_SQLALCHEMY_URL` when set +2. otherwise fall back to `BUB_TAPESTORE_SQLALCHEMY_URL` +3. otherwise scheduler creation may fail and the plugin will log a warning and stay disabled + +Example: + +```bash +export BUB_SCHEDULE_SQLALCHEMY_URL=sqlite:////tmp/bub-schedule.sqlite +``` + +Or reuse the shared Bub tapestore: + +```bash +export BUB_TAPESTORE_SQLALCHEMY_URL=mysql+oceanbase://root:@127.0.0.1:2881/bub +``` + +## Runtime Behavior + +`ScheduleImpl` starts the scheduler lazily from `load_state()`, which runs before tools on every inbound message. This is the key behavior that keeps scheduling usable in `bub chat`: even when only the `cli` channel is active, the scheduler is started and jobs are persisted instead of being left in APScheduler's in-memory pending queue. + +When the `schedule` channel is enabled in a gateway runtime, `ScheduleChannel` also starts the same scheduler on channel startup and shuts it down cleanly on channel stop. + +If scheduler construction fails, the plugin does not crash the framework: + +- `load_state()` logs `Schedule plugin disabled: ...` and returns an empty state +- `provide_channels()` logs the same warning and returns no `schedule` channel + +## Test-Covered Behavior + +Current tests cover these behaviors: + +- SQLAlchemy job store round-trip persistence with SQLite +- `ScheduleImpl.load_state()` starts the injected scheduler +- settings resolution from `BUB_SCHEDULE_SQLALCHEMY_URL` +- fallback from `BUB_TAPESTORE_SQLALCHEMY_URL` +- `schedule.trigger` executes both sync and async jobs +- `schedule.trigger` does not shift an interval job's `next_run_time` + +## Limitations + +- this package only provides scheduling infrastructure; actual reminder delivery still depends on the surrounding Bub runtime and enabled channels +- session scoping is based on the `session_id` stored in job kwargs +- persistence quality and locking semantics depend on the configured SQLAlchemy backend diff --git a/contrib/bubseek-schedule/pyproject.toml b/contrib/bub-schedule-sqlalchemy/pyproject.toml similarity index 50% rename from contrib/bubseek-schedule/pyproject.toml rename to contrib/bub-schedule-sqlalchemy/pyproject.toml index 73c9df3..abe483b 100644 --- a/contrib/bubseek-schedule/pyproject.toml +++ b/contrib/bub-schedule-sqlalchemy/pyproject.toml @@ -1,21 +1,19 @@ [project] -name = "bubseek-schedule" +name = "bub-schedule-sqlalchemy" version = "0.1.0" -description = "Scheduling tools for Bubseek (OceanBase/seekdb job store)" +description = "A standard Bub scheduling plugin backed by APScheduler SQLAlchemy job stores" authors = [{ name = "Chojan Shang", email = "psiace@apache.org" }] requires-python = ">=3.12" dependencies = [ "apscheduler>=3.11.2", "bub", - "bubseek", "loguru", + "pydantic-settings>=2.0.0", + "sqlalchemy>=2.0", ] -[tool.uv.sources] -bubseek = { workspace = true } - -[project.entry-points."bub"] -schedule = "bubseek_schedule.plugin:main" +[project.entry-points.bub] +schedule = "bub_schedule_sqlalchemy.plugin:main" [build-system] requires = ["pdm-backend"] @@ -23,4 +21,4 @@ build-backend = "pdm.backend" [tool.pdm.build] package-dir = "src" -includes = ["src/bubseek_schedule"] +includes = ["src/bub_schedule_sqlalchemy"] diff --git a/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/__init__.py b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/__init__.py new file mode 100644 index 0000000..de521f7 --- /dev/null +++ b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/__init__.py @@ -0,0 +1,5 @@ +"""Reusable Bub scheduling components built on Bub and APScheduler.""" + +from bub_schedule_sqlalchemy.plugin import ScheduleImpl, build_scheduler + +__all__ = ["ScheduleImpl", "build_scheduler"] diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/channel.py b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/channel.py similarity index 100% rename from contrib/bubseek-schedule/src/bubseek_schedule/channel.py rename to contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/channel.py diff --git a/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/job_store.py b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/job_store.py new file mode 100644 index 0000000..fa8162b --- /dev/null +++ b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/job_store.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from pydantic import Field, model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class ScheduleSQLAlchemySettings(BaseSettings): + """Configuration for the APScheduler SQLAlchemy job store.""" + + model_config = SettingsConfigDict( + env_file=".env", + extra="ignore", + populate_by_name=True, + ) + + url: str | None = Field(default=None, validation_alias="BUB_SCHEDULE_SQLALCHEMY_URL") + tapestore_url: str | None = Field(default=None, validation_alias="BUB_TAPESTORE_SQLALCHEMY_URL", exclude=True) + tablename: str = "apscheduler_jobs" + + @model_validator(mode="after") + def inherit_tapestore_url(self) -> ScheduleSQLAlchemySettings: + if self.url is None and self.tapestore_url: + self.url = self.tapestore_url + return self + + +def build_sqlalchemy_jobstore( + *, + settings: ScheduleSQLAlchemySettings, + engine_options: Mapping[str, Any] | None = None, +) -> SQLAlchemyJobStore: + return SQLAlchemyJobStore( + url=settings.url, + tablename=settings.tablename, + engine_options=dict(engine_options or {}), + ) diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/jobs.py b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/jobs.py similarity index 100% rename from contrib/bubseek-schedule/src/bubseek_schedule/jobs.py rename to contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/jobs.py diff --git a/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/plugin.py b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/plugin.py new file mode 100644 index 0000000..66d4cda --- /dev/null +++ b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/plugin.py @@ -0,0 +1,91 @@ +import contextlib +from collections.abc import Callable, Mapping +from typing import Any + +from apscheduler.jobstores.base import BaseJobStore +from apscheduler.schedulers import SchedulerAlreadyRunningError +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.base import BaseScheduler +from bub import hookimpl +from bub.types import Envelope, MessageHandler, State +from loguru import logger + +from bub_schedule_sqlalchemy.job_store import ScheduleSQLAlchemySettings, build_sqlalchemy_jobstore + +SchedulerFactory = Callable[[], BaseScheduler] + + +def build_scheduler(*, jobstore: BaseJobStore, jobstore_alias: str = "default") -> BaseScheduler: + """Build a background scheduler with an injected job store.""" + return BackgroundScheduler(jobstores={jobstore_alias: jobstore}) + + +def build_sqlalchemy_scheduler( + *, + settings: ScheduleSQLAlchemySettings, + engine_options: Mapping[str, Any] | None = None, + jobstore_alias: str = "default", +) -> BaseScheduler: + jobstore = build_sqlalchemy_jobstore(settings=settings, engine_options=engine_options) + return build_scheduler(jobstore=jobstore, jobstore_alias=jobstore_alias) + + +def _default_scheduler() -> BaseScheduler: + return build_sqlalchemy_scheduler(settings=ScheduleSQLAlchemySettings()) + + +class ScheduleImpl: + """Schedule plugin backed by an injected APScheduler scheduler. + + Uses BackgroundScheduler so the scheduler can start without the ``schedule`` channel. + ``bub chat`` only enables the ``cli`` channel; previously AsyncIOScheduler never started, + so APScheduler kept jobs in memory-only ``_pending_jobs`` and nothing reached the DB. + """ + + def __init__(self, scheduler_factory: SchedulerFactory) -> None: + from bub_schedule_sqlalchemy import tools # noqa: F401 + + self._scheduler_factory = scheduler_factory + self._scheduler: BaseScheduler | None = None + + @classmethod + def from_scheduler(cls, scheduler: BaseScheduler) -> "ScheduleImpl": + return cls(lambda: scheduler) + + @property + def scheduler(self) -> BaseScheduler: + if self._scheduler is None: + self._scheduler = self._scheduler_factory() + return self._scheduler + + def _ensure_scheduler_started(self) -> BaseScheduler: + scheduler = self.scheduler + if scheduler.running: + return scheduler + with contextlib.suppress(SchedulerAlreadyRunningError): + scheduler.start() + return scheduler + + @hookimpl + def load_state(self, message: Envelope, session_id: str) -> State: + # Runs before tools on every inbound message — covers CLI-only ``bub chat``. + try: + scheduler = self._ensure_scheduler_started() + except Exception as exc: + logger.warning(f"Schedule plugin disabled: {exc}") + return {} + return {"scheduler": scheduler} + + @hookimpl + def provide_channels(self, message_handler: MessageHandler) -> list: + from bub_schedule_sqlalchemy.channel import ScheduleChannel + + try: + scheduler = self.scheduler + except Exception as exc: + logger.warning(f"Schedule plugin disabled: {exc}") + return [] + return [ScheduleChannel(scheduler)] + + +main = ScheduleImpl(_default_scheduler) diff --git a/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/py.typed b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/tools.py b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/tools.py similarity index 68% rename from contrib/bubseek-schedule/src/bubseek_schedule/tools.py rename to contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/tools.py index 221587a..f160990 100644 --- a/contrib/bubseek-schedule/src/bubseek_schedule/tools.py +++ b/contrib/bub-schedule-sqlalchemy/src/bub_schedule_sqlalchemy/tools.py @@ -1,7 +1,9 @@ +import inspect import uuid from datetime import UTC, datetime, timedelta from typing import cast +from apscheduler.job import Job from apscheduler.jobstores.base import ConflictingIdError, JobLookupError from apscheduler.schedulers.base import BaseScheduler from apscheduler.triggers.cron import CronTrigger @@ -11,15 +13,37 @@ from pydantic import BaseModel, Field from republic import ToolContext -from bubseek_schedule.jobs import run_scheduled_reminder +from bub_schedule_sqlalchemy.jobs import run_scheduled_reminder + +MISSING_SCHEDULER_MESSAGE = "scheduler not found in state, is ScheduleImpl plugin loaded?" +MISSING_TRIGGER_ARGUMENTS_MESSAGE = "One of after_seconds, interval_seconds, or cron must be set" def _ensure_scheduler(state: dict) -> BaseScheduler: if "scheduler" not in state: - raise RuntimeError("scheduler not found in state, is ScheduleImpl plugin loaded?") + raise RuntimeError(MISSING_SCHEDULER_MESSAGE) return cast(BaseScheduler, state["scheduler"]) +def _format_next_run(next_run_time: object) -> str: + if isinstance(next_run_time, datetime): + return next_run_time.isoformat() + return "-" + + +def _get_job_or_raise(scheduler: BaseScheduler, job_id: str) -> Job: + job = scheduler.get_job(job_id) + if job is None: + raise RuntimeError(f"job not found: {job_id}") + return job + + +async def _run_job_now(job: Job) -> None: + result = job.func(*(job.args or ()), **(job.kwargs or {})) + if inspect.isawaitable(result): + await result + + class ScheduleAddInput(BaseModel): after_seconds: int | None = Field(None, description="If set, schedule to run after this many seconds from now") interval_seconds: int | None = Field(None, description="If set, repeat at this interval") @@ -47,7 +71,7 @@ def schedule_add(params: ScheduleAddInput, context: ToolContext) -> str: except ValueError as exc: raise RuntimeError(f"invalid cron expression: {params.cron}") from exc else: - raise RuntimeError("One of after_seconds, interval_seconds, or cron must be set") + raise RuntimeError(MISSING_TRIGGER_ARGUMENTS_MESSAGE) scheduler = _ensure_scheduler(context.state) workspace = context.state.get("_runtime_workspace") try: @@ -66,11 +90,7 @@ def schedule_add(params: ScheduleAddInput, context: ToolContext) -> str: except ConflictingIdError as exc: raise RuntimeError(f"job id already exists: {job_id}") from exc - next_run = "-" - nrt = getattr(job, "next_run_time", None) - if isinstance(nrt, datetime): - next_run = nrt.isoformat() - return f"scheduled: {job.id} next={next_run}" + return f"scheduled: {job.id} next={_format_next_run(getattr(job, 'next_run_time', None))}" @tool(name="schedule.remove", context=True) @@ -91,17 +111,23 @@ def schedule_list(context: ToolContext) -> str: jobs = scheduler.get_jobs() rows: list[str] = [] for job in jobs: - next_run = "-" - nrt = getattr(job, "next_run_time", None) - if isinstance(nrt, datetime): - next_run = nrt.isoformat() message = str(job.kwargs.get("message", "")) job_session = job.kwargs.get("session_id") if job_session and job_session != context.state.get("session_id", ""): continue - rows.append(f"{job.id} next={next_run} msg={message}") + rows.append(f"{job.id} next={_format_next_run(getattr(job, 'next_run_time', None))} msg={message}") if not rows: return "(no scheduled jobs)" return "\n".join(rows) + + +@tool(name="schedule.trigger", context=True) +async def schedule_trigger(job_id: str, context: ToolContext) -> str: + """Run an existing scheduled job immediately without changing its schedule.""" + scheduler = _ensure_scheduler(context.state) + job = _get_job_or_raise(scheduler, job_id) + await _run_job_now(job) + + return f"triggered: {job_id} (next scheduled run: {_format_next_run(job.next_run_time)})" diff --git a/contrib/bub-schedule-sqlalchemy/src/tests/test_bub_schedule_sqlalchemy.py b/contrib/bub-schedule-sqlalchemy/src/tests/test_bub_schedule_sqlalchemy.py new file mode 100644 index 0000000..823d9bf --- /dev/null +++ b/contrib/bub-schedule-sqlalchemy/src/tests/test_bub_schedule_sqlalchemy.py @@ -0,0 +1,188 @@ +"""Tests for bub-schedule-sqlalchemy.""" + +import asyncio +import uuid +from collections.abc import Iterator +from datetime import UTC, datetime, timedelta +from typing import Any + +import pytest +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from bub_schedule_sqlalchemy import tools +from bub_schedule_sqlalchemy.job_store import ( + ScheduleSQLAlchemySettings, + build_sqlalchemy_jobstore, +) +from bub_schedule_sqlalchemy.plugin import ScheduleImpl, build_scheduler +from republic import ToolContext + + +def _test_table_name(prefix: str) -> str: + return f"{prefix}_{uuid.uuid4().hex[:8]}" + + +def _sqlite_url(tmp_path, filename: str) -> str: + return f"sqlite:///{tmp_path / filename}" + + +def _trigger_result(value: object) -> str: + result = asyncio.run(value) if asyncio.iscoroutine(value) else value + assert isinstance(result, str) + return result + + +def _trigger(job_id: str, context: ToolContext) -> str: + handler = tools.schedule_trigger.handler + if handler is None: + raise RuntimeError("schedule.trigger handler is not registered") + + return _trigger_result(handler(job_id, context=context)) + + +def test_jobstore_roundtrip_with_sqlite(tmp_path) -> None: + """Built-in SQLAlchemyJobStore should persist jobs without bubseek helpers.""" + settings = ScheduleSQLAlchemySettings( + url=_sqlite_url(tmp_path, "roundtrip.sqlite"), + tablename=_test_table_name("apscheduler_jobs_test_roundtrip"), + ) + store = build_sqlalchemy_jobstore(settings=settings) + scheduler = build_scheduler(jobstore=store) + scheduler.start() + + scheduler.add_job( + "bub_schedule_sqlalchemy.jobs:_noop", + "date", + run_date=datetime.now(UTC) + timedelta(minutes=1), + id="test-1", + ) + assert store.lookup_job("test-1") is not None + jobs = store.get_all_jobs() + assert len(jobs) == 1 + assert jobs[0].id == "test-1" + + scheduler.remove_job("test-1") + assert store.lookup_job("test-1") is None + scheduler.shutdown() + + +def test_schedule_impl_uses_injected_scheduler(tmp_path) -> None: + settings = ScheduleSQLAlchemySettings( + url=_sqlite_url(tmp_path, "plugin.sqlite"), + tablename=_test_table_name("apscheduler_jobs_test_plugin"), + ) + store = build_sqlalchemy_jobstore(settings=settings) + scheduler = build_scheduler(jobstore=store) + plugin = ScheduleImpl.from_scheduler(scheduler) + + async def _message_handler(_message: object) -> None: + return None + + state = plugin.load_state(message=None, session_id="schedule:test") + + assert state["scheduler"] is scheduler + assert scheduler.running + assert [channel.name for channel in plugin.provide_channels(message_handler=_message_handler)] == ["schedule"] + + +def test_sqlalchemy_settings_support_env(monkeypatch, tmp_path) -> None: + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("BUB_SCHEDULE_SQLALCHEMY_URL", _sqlite_url(tmp_path, "env.sqlite")) + monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) + + settings = ScheduleSQLAlchemySettings() + + assert settings.url == _sqlite_url(tmp_path, "env.sqlite") + assert settings.tablename == "apscheduler_jobs" + + +def test_sqlalchemy_settings_fallback_to_tapestore_env(monkeypatch, tmp_path) -> None: + monkeypatch.chdir(tmp_path) + monkeypatch.delenv("BUB_SCHEDULE_SQLALCHEMY_URL", raising=False) + monkeypatch.setenv("BUB_TAPESTORE_SQLALCHEMY_URL", _sqlite_url(tmp_path, "tapestore.sqlite")) + + settings = ScheduleSQLAlchemySettings() + + assert settings.url == _sqlite_url(tmp_path, "tapestore.sqlite") + assert settings.tablename == "apscheduler_jobs" + + +def test_sqlalchemy_settings_allow_missing_url(monkeypatch, tmp_path) -> None: + monkeypatch.chdir(tmp_path) + monkeypatch.delenv("BUB_SCHEDULE_SQLALCHEMY_URL", raising=False) + monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) + + settings = ScheduleSQLAlchemySettings() + + assert settings.url is None + assert settings.tablename == "apscheduler_jobs" + + +@pytest.fixture +def scheduler() -> Iterator[BackgroundScheduler]: + scheduler = BackgroundScheduler() + scheduler.start() + yield scheduler + scheduler.shutdown(wait=False) + + +@pytest.fixture +def tool_context(scheduler: BackgroundScheduler) -> ToolContext: + return ToolContext( + tape=None, + run_id="test-run", + state={"scheduler": scheduler, "session_id": "test-session"}, + ) + + +def test_schedule_trigger_executes_sync_job_without_shifting_next_run( + scheduler: BackgroundScheduler, tool_context: ToolContext +) -> None: + execution_log: list[dict[str, Any]] = [] + + def sync_job(value: str) -> None: + execution_log.append({"value": value, "timestamp": datetime.now(UTC)}) + + next_run = datetime.now(UTC) + timedelta(hours=1) + scheduler.add_job( + sync_job, + trigger=IntervalTrigger(minutes=5), + id="sync-job", + kwargs={"value": "payload"}, + next_run_time=next_run, + ) + + result = _trigger("sync-job", tool_context) + + assert len(execution_log) == 1 + assert execution_log[0]["value"] == "payload" + assert scheduler.get_job("sync-job") is not None + assert scheduler.get_job("sync-job").next_run_time == next_run + assert "triggered: sync-job" in result + assert next_run.isoformat() in result + + +def test_schedule_trigger_executes_async_job(scheduler: BackgroundScheduler, tool_context: ToolContext) -> None: + execution_log: list[str] = [] + + async def async_job(value: str) -> None: + await asyncio.sleep(0.01) + execution_log.append(value) + + scheduler.add_job( + async_job, + trigger=IntervalTrigger(minutes=5), + id="async-job", + args=["payload"], + next_run_time=datetime.now(UTC) + timedelta(hours=1), + ) + + result = _trigger("async-job", tool_context) + + assert execution_log == ["payload"] + assert "triggered: async-job" in result + + +def test_schedule_trigger_raises_for_missing_job(tool_context: ToolContext) -> None: + with pytest.raises(RuntimeError, match="job not found: missing-job"): + _trigger("missing-job", tool_context) diff --git a/contrib/bubseek-schedule/README.md b/contrib/bubseek-schedule/README.md deleted file mode 100644 index 6d57a21..0000000 --- a/contrib/bubseek-schedule/README.md +++ /dev/null @@ -1,56 +0,0 @@ -# bubseek-schedule - -Scheduling plugin for bubseek with OceanBase/seekdb job store. - -## What It Provides - -- Bub plugin entry point: `schedule` -- A scheduler channel backed by APScheduler -- OceanBase/seekdb job store (pyobvector dialect) -- Built-in tools: - - `schedule.add` - - `schedule.remove` - - `schedule.list` - -## Installation - -bubseek ships `bubseek-schedule` by default. No extra install needed for normal use. - -From bubseek repo (development): - -```bash -uv add ./contrib/bubseek-schedule -``` - -Or as dependency (when not using bubseek default): - -```toml -[project] -dependencies = [ - "bubseek-schedule @ path:./contrib/bubseek-schedule", -] -``` - -## Runtime Behavior - -- The plugin uses **APScheduler `BackgroundScheduler`** (see also upstream [bub-schedule](https://github.com/bubbuild/bub-contrib) JSON store pattern: persistence must not depend on a specific channel being enabled). -- **`load_state` starts the scheduler** on the first inbound message. That way `bub chat` (CLI-only: only the `cli` channel is enabled) still persists jobs to seekdb. Previously, `AsyncIOScheduler` was only started by the `schedule` channel, so CLI chat left jobs in memory-only `_pending_jobs` and **nothing was written to `apscheduler_jobs`**. -- The channel name is `schedule`. Enabling it in `bub gateway` is optional for persistence; it still starts/stops the scheduler cleanly when you use gateway with that channel. -- Jobs are persisted to: - - **OceanBase/seekdb**: Same URL as the tape store (`BUB_TAPESTORE_SQLALCHEMY_URL`), table `apscheduler_jobs`. - -## Provided Tools - -- `schedule.add`: Add a scheduled job with cron, interval, or one-shot. -- `schedule.remove`: Remove a scheduled job by ID. -- `schedule.list`: List all scheduled jobs. - -## Debug: job in chat but not in Marimo kanban / DB - -The gateway resolves the job store URL from `BUB_TAPESTORE_SQLALCHEMY_URL` in the process environment. Marimo must use the **same** URL. - -From the bubseek repo root: - -```bash -uv run python scripts/query_apscheduler_jobs.py --job-id -``` diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/__init__.py b/contrib/bubseek-schedule/src/bubseek_schedule/__init__.py deleted file mode 100644 index 9893bc3..0000000 --- a/contrib/bubseek-schedule/src/bubseek_schedule/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Bubseek schedule plugin with OceanBase/seekdb job store.""" diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py b/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py deleted file mode 100644 index 1ab9bdb..0000000 --- a/contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py +++ /dev/null @@ -1,220 +0,0 @@ -"""OceanBase/seekdb job store for APScheduler using pyobvector dialect.""" - -from __future__ import annotations - -import pickle -import threading -from datetime import datetime - -from apscheduler.job import Job -from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError -from loguru import logger -from sqlalchemy import Table, case, create_engine, select -from sqlalchemy.engine import Engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy.orm.session import sessionmaker as sessionmaker_type - -import bubseek.oceanbase # noqa: F401 - register mysql+oceanbase dialect - - -def _get_jobstore_url() -> str: - """Resolve tapestore URL from the shared runtime environment.""" - from bubseek.oceanbase import resolve_tapestore_url - - return resolve_tapestore_url() - - -def _normalize_url(url: str) -> str: - """Use mysql+oceanbase for pyobvector dialect when mysql is configured.""" - from bubseek.oceanbase import normalize_oceanbase_url - - return normalize_oceanbase_url(url) - - -class OceanBaseJobStore(BaseJobStore): - """ - A SQL-based job store for APScheduler using OceanBase/seekdb (pyobvector). - - Jobs are serialized with pickle and stored in a database table. Uses the same - database as BUB_TAPESTORE_SQLALCHEMY_URL - one DB, multiple tables - (tapestore + apscheduler_jobs). - """ - - def __init__(self, url: str | None = None, tablename: str = "apscheduler_jobs"): - super().__init__() - self._url_explicit = url - self._tablename = tablename - self._engine: Engine | None = None - self._session_factory: sessionmaker_type | None = None - self._table: Table | None = None - self._lock = threading.RLock() - - def _connection_url(self) -> str: - if self._url_explicit is not None: - return _normalize_url(self._url_explicit) - return _normalize_url(_get_jobstore_url()) - - def _ensure_initialized(self) -> None: - if self._engine is not None and self._session_factory is not None and self._table is not None: - return - self._engine = create_engine(self._connection_url(), pool_pre_ping=True) - self._session_factory = sessionmaker(bind=self._engine, expire_on_commit=False) - self._init_table() - - def _init_table(self) -> None: - """Create apscheduler_jobs table if not exists.""" - from sqlalchemy import Column, DateTime, LargeBinary, MetaData, String - - if self._engine is None: - raise RuntimeError("jobstore engine not initialized") - - metadata = MetaData() - self._table = Table( - self._tablename, - metadata, - Column("id", String(191), primary_key=True), - Column("next_run_time", DateTime(timezone=True), nullable=True), - Column("job_state", LargeBinary, nullable=False), - ) - metadata.create_all(self._engine) - - def _session_factory_or_raise(self) -> sessionmaker_type: - self._ensure_initialized() - if self._session_factory is None: - raise RuntimeError("jobstore session factory not initialized") - return self._session_factory - - def _table_or_raise(self) -> Table: - self._ensure_initialized() - if self._table is None: - raise RuntimeError("jobstore table not initialized") - return self._table - - def _serialize_job(self, job: Job) -> bytes: - return pickle.dumps(job, protocol=pickle.HIGHEST_PROTOCOL) - - def _deserialize_job(self, data: bytes) -> Job | None: - try: - job = pickle.loads(data) # noqa: S301 - job._scheduler = self._scheduler - job._jobstore_alias = self._alias - except Exception as e: - logger.error(f"Error deserializing job: {e}") - return None - return job - - def start(self, scheduler, alias: str) -> None: - super().start(scheduler, alias) - self._ensure_initialized() - - def shutdown(self) -> None: - with self._lock: - if self._engine is not None: - self._engine.dispose() - - def lookup_job(self, job_id: str) -> Job | None: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock, session_factory() as session: - row = session.execute(select(table).where(table.c.id == job_id)).first() - if row: - return self._deserialize_job(row.job_state) - return None - - def get_due_jobs(self, now: datetime) -> list[Job]: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock: - due_jobs = [] - with session_factory() as session: - stmt = ( - select(table) - .where(table.c.next_run_time <= now) - .where(table.c.next_run_time.isnot(None)) - .order_by(table.c.next_run_time) - ) - rows = session.execute(stmt).all() - for row in rows: - job = self._deserialize_job(row.job_state) - if job: - due_jobs.append(job) - return due_jobs - - def get_next_run_time(self) -> datetime | None: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock, session_factory() as session: - stmt = ( - select(table.c.next_run_time) - .where(table.c.next_run_time.isnot(None)) - .order_by(table.c.next_run_time) - .limit(1) - ) - row = session.execute(stmt).first() - return row[0] if row else None - - def get_all_jobs(self) -> list[Job]: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock: - jobs = [] - with session_factory() as session: - # MySQL/OceanBase don't support NULLS LAST; use CASE for compatibility - next_run = table.c.next_run_time - nulls_last_expr = case((next_run.is_(None), 1), else_=0) - stmt = select(table).order_by(nulls_last_expr.asc(), next_run.asc()) - rows = session.execute(stmt).all() - for row in rows: - job = self._deserialize_job(row.job_state) - if job: - jobs.append(job) - return jobs - - def add_job(self, job: Job) -> None: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock, session_factory() as session: - existing = session.execute(select(table).where(table.c.id == job.id)).first() - if existing: - raise ConflictingIdError(job.id) - session.execute( - table.insert().values( - id=job.id, - next_run_time=job.next_run_time, - job_state=self._serialize_job(job), - ) - ) - session.commit() - - def update_job(self, job: Job) -> None: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock, session_factory() as session: - result = session.execute( - table - .update() - .where(table.c.id == job.id) - .values( - next_run_time=job.next_run_time, - job_state=self._serialize_job(job), - ) - ) - if getattr(result, "rowcount", 0) == 0: - raise JobLookupError(job.id) - session.commit() - - def remove_job(self, job_id: str) -> None: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock, session_factory() as session: - result = session.execute(table.delete().where(table.c.id == job_id)) - if getattr(result, "rowcount", 0) == 0: - raise JobLookupError(job_id) - session.commit() - - def remove_all_jobs(self) -> None: - session_factory = self._session_factory_or_raise() - table = self._table_or_raise() - with self._lock, session_factory() as session: - session.execute(table.delete()) - session.commit() diff --git a/contrib/bubseek-schedule/src/bubseek_schedule/plugin.py b/contrib/bubseek-schedule/src/bubseek_schedule/plugin.py deleted file mode 100644 index b75757e..0000000 --- a/contrib/bubseek-schedule/src/bubseek_schedule/plugin.py +++ /dev/null @@ -1,49 +0,0 @@ -import contextlib - -from apscheduler.schedulers import SchedulerAlreadyRunningError -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.schedulers.base import BaseScheduler -from bub import hookimpl -from bub.types import Envelope, MessageHandler, State - -from bubseek_schedule.jobstore import OceanBaseJobStore - - -def default_scheduler() -> BaseScheduler: - job_store = OceanBaseJobStore() - return BackgroundScheduler(jobstores={"default": job_store}) - - -class ScheduleImpl: - """Schedule plugin: persist jobs to seekdb via OceanBaseJobStore. - - Uses BackgroundScheduler so the scheduler can start without the ``schedule`` channel. - ``bub chat`` only enables the ``cli`` channel; previously AsyncIOScheduler never started, - so APScheduler kept jobs in memory-only ``_pending_jobs`` and nothing reached the DB. - """ - - def __init__(self) -> None: - from bubseek_schedule import tools # noqa: F401 - - self.scheduler = default_scheduler() - - def _ensure_scheduler_started(self) -> None: - if self.scheduler.running: - return - with contextlib.suppress(SchedulerAlreadyRunningError): - self.scheduler.start() - - @hookimpl - def load_state(self, message: Envelope, session_id: str) -> State: - # Runs before tools on every inbound message — covers CLI-only ``bub chat``. - self._ensure_scheduler_started() - return {"scheduler": self.scheduler} - - @hookimpl - def provide_channels(self, message_handler: MessageHandler) -> list: - from bubseek_schedule.channel import ScheduleChannel - - return [ScheduleChannel(self.scheduler)] - - -main = ScheduleImpl() diff --git a/contrib/bubseek-schedule/src/tests/test_bubseek_schedule.py b/contrib/bubseek-schedule/src/tests/test_bubseek_schedule.py deleted file mode 100644 index a7d88a7..0000000 --- a/contrib/bubseek-schedule/src/tests/test_bubseek_schedule.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Tests for bubseek-schedule (OceanBaseJobStore).""" - -import os -import uuid -from datetime import datetime, timedelta - -import pytest - - -def _seekdb_url() -> str: - url = (os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL") or "").strip() - if not url: - pytest.skip("BUB_TAPESTORE_SQLALCHEMY_URL is required for schedule tests") - if "mysql" not in url and "oceanbase" not in url: - pytest.skip("schedule tests require a seekdb/OceanBase URL") - return url - - -def _test_table_name(prefix: str) -> str: - return f"{prefix}_{uuid.uuid4().hex[:8]}" - - -def test_jobstore_roundtrip(): - """Test jobstore roundtrip via APScheduler on seekdb/OceanBase.""" - from apscheduler.schedulers.background import BackgroundScheduler - - url = _seekdb_url() - from bubseek_schedule.jobstore import OceanBaseJobStore - - store = OceanBaseJobStore(url=url, tablename=_test_table_name("apscheduler_jobs_test_roundtrip")) - scheduler = BackgroundScheduler(jobstores={"default": store}) - scheduler.start() - - scheduler.add_job( - "bubseek_schedule.jobs:_noop", "date", run_date=datetime.now() + timedelta(minutes=1), id="test-1" - ) - assert store.lookup_job("test-1") is not None - jobs = store.get_all_jobs() - assert len(jobs) == 1 - assert jobs[0].id == "test-1" - - scheduler.remove_job("test-1") - assert store.lookup_job("test-1") is None - scheduler.shutdown() - - -def test_jobstore_get_due_jobs(): - """Test get_due_jobs and get_next_run_time.""" - from apscheduler.schedulers.background import BackgroundScheduler - - url = _seekdb_url() - from bubseek_schedule.jobstore import OceanBaseJobStore - - store = OceanBaseJobStore(url=url, tablename=_test_table_name("apscheduler_jobs_test_due")) - scheduler = BackgroundScheduler(jobstores={"default": store}) - scheduler.start() - - now = datetime.now() - past = now - timedelta(minutes=1) - future = now + timedelta(hours=1) - - scheduler.add_job("bubseek_schedule.jobs:_noop", "date", run_date=past, id="past") - scheduler.add_job("bubseek_schedule.jobs:_noop", "date", run_date=future, id="future") - - due = store.get_due_jobs(now) - assert len(due) == 1 - assert due[0].id == "past" - - next_run = store.get_next_run_time() - assert next_run is not None - assert next_run <= now - - scheduler.shutdown() diff --git a/pyproject.toml b/pyproject.toml index 006e590..0d92b05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "bub-tapestore-sqlalchemy", "bub-mcp", "pyobvector>=0.2.26", - "bubseek-schedule", + "bub-schedule-sqlalchemy", "bubseek-marimo", ] @@ -44,8 +44,8 @@ langchain = [ [project.urls] Repository = "https://github.com/ob-labs/bubseek" -[project.entry-points."bub"] -oceanbase-dialect = "bubseek.oceanbase:register" +[project.entry-points."sqlalchemy.dialects"] +"mysql.oceanbase" = "bubseek.oceanbase:OceanBaseDialect" [dependency-groups] dev = [ @@ -96,13 +96,13 @@ bub-web-search = { git = "https://github.com/bubbuild/bub-contrib.git", branch = bub-tapestore-sqlalchemy = { git = "https://github.com/bubbuild/bub-contrib.git", branch = "main", subdirectory = "packages/bub-tapestore-sqlalchemy" } bub-mcp = { git = "https://github.com/bubbuild/bub-contrib.git", branch = "main", subdirectory = "packages/bub-mcp" } bubseek-langchain = { workspace = true } -bubseek-schedule = { workspace = true } +bub-schedule-sqlalchemy = { workspace = true } bubseek-marimo = { workspace = true } [tool.uv.workspace] members = [ "contrib/bubseek-langchain", - "contrib/bubseek-schedule", + "contrib/bub-schedule-sqlalchemy", "contrib/bubseek-marimo", ] @@ -116,7 +116,7 @@ python-version = "3.12" exclude = ["references", "insights", "scripts/sitecustomize.py"] [tool.pytest.ini_options] -testpaths = ["tests", "contrib/bubseek-langchain/tests", "contrib/bubseek-schedule/src/tests", "contrib/bubseek-marimo/tests"] +testpaths = ["tests", "contrib/bubseek-langchain/tests", "contrib/bub-schedule-sqlalchemy/src/tests", "contrib/bubseek-marimo/tests"] [tool.ruff] target-version = "py312" diff --git a/src/bubseek/oceanbase.py b/src/bubseek/oceanbase.py index e3ba940..0cf46f7 100644 --- a/src/bubseek/oceanbase.py +++ b/src/bubseek/oceanbase.py @@ -2,7 +2,6 @@ from __future__ import annotations -import os import sys from collections.abc import Callable from typing import Any, cast @@ -11,7 +10,7 @@ import pymysql import pyobvector # noqa: F401 import typer -from bub import hookimpl +from pydantic import ValidationError from pyobvector.schema.dialect import OceanBaseDialect as _OceanBaseDialect from sqlalchemy.dialects import registry @@ -45,8 +44,15 @@ def normalize_oceanbase_url(url: str) -> str: def resolve_tapestore_url(url: str | None = None) -> str: """Resolve the tapestore URL from an explicit value or the process environment.""" - raw_url = os.environ.get("BUB_TAPESTORE_SQLALCHEMY_URL", "") if url is None else url - return normalize_oceanbase_url(raw_url) + if url is not None: + return normalize_oceanbase_url(url) + + from bubseek.settings import load_bubseek_settings + + try: + return load_bubseek_settings().tapestore_url + except ValidationError: + return "" def mysql_connection_params( @@ -203,16 +209,3 @@ def _validate_schema_tolerant(self: Any) -> None: _patch_tape_store_validate_schema() - - -def register(framework: object) -> object: - """Bub plugin entry point. Registers the OceanBase dialect plugin.""" - return _OceanBaseDialectPlugin() - - -class _OceanBaseDialectPlugin: - """Minimal plugin to satisfy the Bub loader.""" - - @hookimpl - def provide_tape_store(self) -> None: - return None diff --git a/src/bubseek/settings.py b/src/bubseek/settings.py new file mode 100644 index 0000000..ec9c1a9 --- /dev/null +++ b/src/bubseek/settings.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from pydantic import Field, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + +from bubseek.oceanbase import normalize_oceanbase_url + + +class BubseekSettings(BaseSettings): + """Shared runtime settings for Bubseek integrations.""" + + model_config = SettingsConfigDict( + env_file=".env", + extra="ignore", + populate_by_name=True, + ) + + tapestore_url: str = Field(validation_alias="BUB_TAPESTORE_SQLALCHEMY_URL") + + @field_validator("tapestore_url") + @classmethod + def normalize_tapestore_url(cls, value: str) -> str: + return normalize_oceanbase_url(value) + + +def load_bubseek_settings() -> BubseekSettings: + return BubseekSettings() # ty: ignore[missing-argument] diff --git a/tests/test_bubseek.py b/tests/test_bubseek.py index e76171e..fef078d 100644 --- a/tests/test_bubseek.py +++ b/tests/test_bubseek.py @@ -11,21 +11,21 @@ import pytest from bub.skills import _read_skill +from pydantic import ValidationError REPO_ROOT = Path(__file__).resolve().parents[1] -BUBSEEK_SRC = REPO_ROOT / "src" @contextmanager def imported_bubseek_modules(*module_names: str) -> Iterator[list[ModuleType]]: - sys.path.insert(0, str(BUBSEEK_SRC)) try: yield [importlib.import_module(name) for name in module_names] finally: - sys.path.remove(str(BUBSEEK_SRC)) for module_name in list(sys.modules): if module_name == "bubseek" or module_name.startswith("bubseek."): sys.modules.pop(module_name, None) + if module_name == "bub_schedule_sqlalchemy" or module_name.startswith("bub_schedule_sqlalchemy."): + sys.modules.pop(module_name, None) def _load_pyproject() -> dict[str, object]: @@ -37,14 +37,14 @@ def _as_dict(value: object) -> dict[str, object]: return cast(dict[str, object], value) -def test_distribution_metadata_exposes_bub_plugin_without_console_script() -> None: +def test_distribution_metadata_exposes_sqlalchemy_dialect_without_console_script() -> None: data = _load_pyproject() project = _as_dict(data["project"]) assert "scripts" not in project assert project["entry-points"] == { - "bub": { - "oceanbase-dialect": "bubseek.oceanbase:register", + "sqlalchemy.dialects": { + "mysql.oceanbase": "bubseek.oceanbase:OceanBaseDialect", }, } @@ -74,14 +74,6 @@ def test_bundled_skills_have_valid_frontmatter() -> None: assert "github-repo-cards" in skill_names -def test_resolve_tapestore_url_requires_explicit_url(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: - monkeypatch.setenv("BUB_TAPESTORE_SQLALCHEMY_URL", "") - - assert oceanbase_mod.resolve_tapestore_url() == "" - assert oceanbase_mod.mysql_connection_params() is None - - def test_mysql_connection_params_extract_mysql_values(monkeypatch) -> None: with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setenv( @@ -99,18 +91,6 @@ def test_mysql_connection_params_extract_mysql_values(monkeypatch) -> None: ) -def test_resolve_tapestore_url_prefers_explicit_argument_over_env(monkeypatch) -> None: - with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: - monkeypatch.setenv( - "BUB_TAPESTORE_SQLALCHEMY_URL", - "mysql+oceanbase://env:secret@seekdb.example:2881/env_db", - ) - - url = oceanbase_mod.resolve_tapestore_url("mysql://cli:secret@seekdb.example:2881/cli_db") - - assert url == "mysql+oceanbase://cli:secret@seekdb.example:2881/cli_db" - - def test_oceanbase_registers_mysql_pymysql_alias() -> None: with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: from sqlalchemy.dialects import registry @@ -120,6 +100,26 @@ def test_oceanbase_registers_mysql_pymysql_alias() -> None: assert dialect_cls is oceanbase_mod.OceanBaseDialect +def test_bubseek_settings_require_tapestore_url(monkeypatch) -> None: + with imported_bubseek_modules("bubseek.settings") as [settings_mod]: + monkeypatch.delenv("BUB_TAPESTORE_SQLALCHEMY_URL", raising=False) + + with pytest.raises(ValidationError): + settings_mod.BubseekSettings(_env_file=None) + + +def test_bubseek_settings_normalize_tapestore_url(monkeypatch) -> None: + with imported_bubseek_modules("bubseek.settings") as [settings_mod]: + monkeypatch.setenv( + "BUB_TAPESTORE_SQLALCHEMY_URL", + "mysql+pymysql://seek:secret@seekdb.example:2881/analytics", + ) + + settings = settings_mod.load_bubseek_settings() + + assert settings.tapestore_url == "mysql+oceanbase://seek:secret@seekdb.example:2881/analytics" + + def test_ensure_database_skips_non_mysql_backends(monkeypatch) -> None: with imported_bubseek_modules("bubseek.oceanbase") as [oceanbase_mod]: monkeypatch.setattr(oceanbase_mod, "mysql_connection_params", lambda *_: None) diff --git a/uv.lock b/uv.lock index c08752c..b96ff4a 100644 --- a/uv.lock +++ b/uv.lock @@ -15,10 +15,10 @@ resolution-markers = [ [manifest] members = [ + "bub-schedule-sqlalchemy", "bubseek", "bubseek-langchain", "bubseek-marimo", - "bubseek-schedule", ] [[package]] @@ -408,6 +408,27 @@ dependencies = [ { name = "pydantic-settings" }, ] +[[package]] +name = "bub-schedule-sqlalchemy" +version = "0.1.0" +source = { editable = "contrib/bub-schedule-sqlalchemy" } +dependencies = [ + { name = "apscheduler" }, + { name = "bub" }, + { name = "loguru" }, + { name = "pydantic-settings" }, + { name = "sqlalchemy" }, +] + +[package.metadata] +requires-dist = [ + { name = "apscheduler", specifier = ">=3.11.2" }, + { name = "bub", git = "https://github.com/bubbuild/bub.git" }, + { name = "loguru" }, + { name = "pydantic-settings", specifier = ">=2.0.0" }, + { name = "sqlalchemy", specifier = ">=2.0" }, +] + [[package]] name = "bub-tapestore-sqlalchemy" version = "0.1.0" @@ -447,12 +468,12 @@ dependencies = [ { name = "bub-discord" }, { name = "bub-feishu" }, { name = "bub-mcp" }, + { name = "bub-schedule-sqlalchemy" }, { name = "bub-tapestore-sqlalchemy" }, { name = "bub-web-search" }, { name = "bub-wechat" }, { name = "bub-wecom" }, { name = "bubseek-marimo" }, - { name = "bubseek-schedule" }, { name = "pydantic-settings" }, { name = "pyobvector" }, { name = "python-dotenv" }, @@ -488,13 +509,13 @@ requires-dist = [ { name = "bub-discord", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-discord&branch=main" }, { name = "bub-feishu", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-feishu&branch=main" }, { name = "bub-mcp", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-mcp&branch=main" }, + { name = "bub-schedule-sqlalchemy", editable = "contrib/bub-schedule-sqlalchemy" }, { name = "bub-tapestore-sqlalchemy", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-tapestore-sqlalchemy&branch=main" }, { name = "bub-web-search", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-web-search&branch=main" }, { name = "bub-wechat", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-wechat&branch=main" }, { name = "bub-wecom", git = "https://github.com/bubbuild/bub-contrib.git?subdirectory=packages%2Fbub-wecom&branch=main" }, { name = "bubseek-langchain", marker = "extra == 'langchain'", editable = "contrib/bubseek-langchain" }, { name = "bubseek-marimo", editable = "contrib/bubseek-marimo" }, - { name = "bubseek-schedule", editable = "contrib/bubseek-schedule" }, { name = "deepagents", marker = "extra == 'langchain'", specifier = ">=0.5.3" }, { name = "langchain-openai", marker = "extra == 'langchain'", specifier = ">=0.3.0" }, { name = "pydantic-settings", specifier = ">=2.0.0" }, @@ -570,25 +591,6 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.0.0" }, ] -[[package]] -name = "bubseek-schedule" -version = "0.1.0" -source = { editable = "contrib/bubseek-schedule" } -dependencies = [ - { name = "apscheduler" }, - { name = "bub" }, - { name = "bubseek" }, - { name = "loguru" }, -] - -[package.metadata] -requires-dist = [ - { name = "apscheduler", specifier = ">=3.11.2" }, - { name = "bub", git = "https://github.com/bubbuild/bub.git" }, - { name = "bubseek", editable = "." }, - { name = "loguru" }, -] - [[package]] name = "cachetools" version = "7.0.6"