feat(dispatch): turn-boundary batching dispatcher v2#686
feat(dispatch): turn-boundary batching dispatcher v2#686thepagent merged 34 commits intoopenabdev:mainfrom
Conversation
c408a5a to
00d7b25
Compare
00d7b25 to
5b7e08c
Compare
Status update — SendError testing approachAdded in latest amend (
Not in this PR — full Phase 1 coverage strategy: unit-test the predicate (here), defer the integrated path to a manual staging smoke matrix entry in the ADR (PR #598). Smoke procedure draft (TBD landing decision):
Decision pending: land the §6.10 smoke entry in PR #598's ADR doc, or co-locate here. |
What's in PR 686 — feature checklistCore architecture
Three invariants enforced (ADR §1)
Modes & config (
Adapter wiring
Slash commands
Packing (
Observability (ADR §6.6)
Reaction UX (ADR §6.7)
Error handling (ADR §2.5)
Graceful shutdown (ADR §6.8)
Token estimation
Tests (11 unit tests in
Removed
Deferred to follow-up PRs
|
ADR Phase 1 (§4.4) — item-by-item checklistMapping each deliverable in ADR §4.4 to the code in Mechanism deliverables
Tests required by §4.4
Tally
Recommended follow-ups (separate PRs)
Edits
|
5b7e08c to
16bed85
Compare
OpenAB PR ScreeningThis is auto-generated by the OpenAB project-screening flow for context collection and reviewer handoff.
Screening report## IntentPR #686 aims to add turn-boundary message batching across Discord, Slack, and Gateway adapters. The operator-visible problem is that rapid follow-up messages can currently be processed as separate turns, creating unnecessary agent runs, fragmented context, and awkward response timing. This PR introduces a dispatcher that starts the first message immediately, buffers later messages while a turn is in flight, and dispatches them as a batch at the next turn boundary. FeatFeature: a new dispatch layer for batched message processing. Behavioral change: adapters can run either in existing per-message mode or in batched mode via configuration. Batched mode enforces one active turn per thread/channel context, preserves structured arrival metadata, evicts failed send paths, and adds buffer/token limits. Who It ServesPrimary beneficiaries: Discord and Slack end users who send multi-message bursts, plus agent runtime operators who need lower duplicate-run pressure and cleaner turn semantics. Secondary beneficiaries: maintainers and reviewers, because adapter behavior becomes more explicit through Rewritten PromptImplement Phase 1 of the turn-boundary batching ADR. Add a shared dispatcher that supports configurable per-message or batched processing for Discord, Slack, and Gateway. In batched mode, the first message for a thread should dispatch immediately, subsequent messages should buffer while the turn is active, and exactly one buffered batch should dispatch when the active turn completes. Preserve message ordering, channel/message references, adapter routing metadata, and existing per-message behavior when batching is disabled. Add config fields for processing mode, max buffered messages, and max batch tokens. Add focused tests for token estimation, arrival-event packing, batching shape, buffer limits, SendError eviction, and no overlapping turns for the same thread. Merge PitchThis is worth advancing because it addresses a real interaction quality issue: users often send thoughts in bursts, but agents should usually answer once per coherent turn. The PR also centralizes dispatch behavior instead of leaving each adapter to approximate batching independently. Risk profile: moderate to high. It adds a new concurrency primitive and touches Discord, Slack, Gateway, config, and main wiring. Likely reviewer concerns are message loss, duplicate dispatch, ordering guarantees, shutdown behavior, backpressure, and whether the test coverage is strong enough for async edge cases. Best-Practice ComparisonOpenClaw principles that apply:
Hermes Agent principles that apply:
Implementation OptionsConservative option: merge only the shared packing/config groundwork and keep adapters in per-message mode by default. Land Balanced option: merge the dispatcher behind opt-in config. Keep current behavior as the default, enable batched mode per adapter, and require async tests for ordering, no overlap, overflow handling, and shutdown before merge. Ambitious option: evolve the dispatcher into a durable runtime queue. Persist buffered messages, add retry/backoff, structured run logs, metrics, and recovery after restart. This would align more closely with OpenClaw-style durable execution but is larger than the current PR. Comparison Table
RecommendationAdvance the balanced option: review PR #686 as an opt-in Phase 1 dispatcher, with current per-message behavior preserved as the safe default. Before merge discussion, require focused async coverage for the core invariants: first-message zero latency, at-most-one in-flight turn, ordered batch dispatch, buffer/token limits, SendError eviction, and graceful shutdown. Split durable persistence, retry/backoff, and richer run logs into a follow-up PR so this change stays reviewable. |
chaodu-agent
left a comment
There was a problem hiding this comment.
CHANGES_REQUESTED — Strong architecture with thorough ADR backing and good test coverage, but three issues need attention before merge.
Baseline Check
Main has per-message dispatch (each inbound → one tokio::spawn → one ACP turn), inline block-packing in adapter.rs:131-152 that reorders extra_blocks (text blocks before sender_context, images after), Slack KeyedAsyncQueue for FIFO ordering, and no batching mechanism.
PR adds: src/dispatch.rs (724 lines) with Dispatcher, BufferedMessage, consumer_loop, dispatch_batch; MessageProcessingMode enum; pack_arrival_event() unified packing; SenderContext.timestamp; ReactionsConfig gains Clone; Slack KeyedAsyncQueue removed; /reset integration.
四問框架
- What problem? Multiple messages arriving during an in-flight ACP turn become separate sequential turns — wasting tokens, losing attachment attribution, non-deterministic ordering.
- How? Per-thread bounded
mpsc::channelwith consumer task. First message fires immediately (I1), subsequent messages buffer and batch at turn boundary. Config-gated:per-message(default) orbatched. - Alternatives? Pre-turn debouncing (rejected: adds latency), mutex-level coalescing (rejected: opaque),
<message index=N>wrapper (rejected: attribution loss). All documented in ADR #598. - Best approach? Architecture is sound. Three issues below need resolution.
Traffic Light
🔴 SUGGESTED CHANGES
1. Block ordering semantic change in pack_arrival_event
On main, adapter.rs:131-152 reorders extra_blocks: text blocks (voice transcripts) go before the sender_context header, image blocks go after. The new pack_arrival_event places all extra_blocks after the header in arrival order. This changes behavior even in PerMessage mode (which also calls pack_arrival_event). Voice transcript blocks will now appear after the prompt instead of before it.
Recommendation: Document this ordering change explicitly. If intentional (ADR says it is), confirm existing agents handle transcripts appearing after the prompt correctly. Consider whether this warrants a CHANGELOG entry.
2. ReactionsConfig::default() used for queued emoji in dispatch_batch
let queued_emoji = crate::config::ReactionsConfig::default().emojis.queued;This ignores the user's actual reactions config (custom emojis, enabled flag). The router's config is available via router.reactions_config() (which this PR itself adds). Should use the actual config.
3. Slack KeyedAsyncQueue removal affects PerMessage mode
The PR removes KeyedAsyncQueue entirely from Slack, but in PerMessage mode there's now no per-thread serialization — messages go directly to router.handle_message() without the semaphore guard. On main, KeyedAsyncQueue ensured FIFO ordering even in per-message mode. This is a regression for PerMessage Slack users.
Recommendation: Either keep KeyedAsyncQueue for PerMessage mode, or document that PerMessage mode on Slack no longer guarantees strict FIFO ordering.
🟡 NIT
- Duplicated
days_to_ymd/ timestamp conversion betweenslack.rsandgateway.rs— extract to shared utility sender_namefield from ADR §2.3 missing inBufferedMessage— note the divergenceDispatchErrordoesn't implementstd::error::Error— limits composability withanyhow
🟢 INFO
- Excellent test coverage for packing logic (single message, batch of 2, extra blocks, all four ADR §3.6 scenarios)
- Clean config gating — default
PerMessageis fully backward-compatible - Graceful shutdown with
buffered_lostcounts per thread /resetintegration drops pending messages and reports count to userstream_prompt_blocksextraction enables clean reuse
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Previously slack_ts_to_iso8601 split on '.' and parsed the fractional substring as an integer, treating ".12" as 12 ms instead of 120 ms. Parsing the entire string as f64 carries decimal semantics correctly without any string-padding logic. Co-Authored-By: Claude Opus 4.7 <[email protected]>
The buffered-message count is approximate (sweep races with new arrivals) so surfacing an exact number to users was misleading. Show a binary "cleared / nothing" signal instead. The pending_count() API stays for logs and metrics. Co-Authored-By: Claude Opus 4.7 <[email protected]>
…ents Make the no-.await-while-locked invariant explicit at each lock acquisition site so future edits can't silently introduce an .await without tripping the comment. The struct-level note at line 183 stays as the higher-level explanation. Co-Authored-By: Claude Opus 4.7 <[email protected]>
Replace futures_util::future::join_all with a sequential await loop. Batches are typically small (low single digits) so the serialization cost is sub-second and not user-visible, and the dispatch path no longer pulls in join_all just for one call. Co-Authored-By: Claude Opus 4.7 <[email protected]>
Per-message mode (cap=1) doesn't benefit from holding consumers across message gaps — there is no batch window to preserve — so a 5-minute idle timeout left consumer tasks lingering long after they were useful. Add PER_MESSAGE_CONSUMER_IDLE_TIMEOUT (10s), wire it through main.rs based on each adapter's message_processing_mode, and drop the unused Dispatcher::new wrapper. By Little's Law (steady-state idle count = arrival rate × idle window), this cuts per-message-mode idle dispatcher footprint by 30x for the same arrival rate while keeping batched modes' 5-minute window so between-trigger lanes aren't torn down on every message. Co-Authored-By: Claude Opus 4.7 <[email protected]>
…ry-batching-v2 # Conflicts: # src/gateway.rs
…e 1 implementation Updates the ADR to match decisions made during PR openabdev#686 review: - 3-valued MessageProcessingMode (per-message / per-thread / per-lane) replacing the earlier 2-valued (per-message / batched) design. §4.1 documents per-mode (cap, dispatcher key, idle timeout) tuple; §4.4 Phase 1 bullets reflect the unified Dispatcher::submit path; legacy "batched" alias is rejected at config parse. - Standalone <sender_context> Text block (commit 072010c). §3.1 / §3.3 / §3.4 / §3.5 / §3.6 + §6.4 rule 4 now describe the split-block layout: delimiter + transcripts + prompt + images. Transcripts move from before the envelope to inside the arrival event (between delimiter and prompt); images stay after prompt as in the pre-batching adapter. Empty prompt is omitted from the block stream. - New §6.10 — per-mode consumer idle timeout (PER_MESSAGE = 10s, DEFAULT = 300s) with Little's-Law rationale and sweep_stale eviction. - New §6.11 — SendError manual staging smoke matrix (the entry deferred out of CI in PR openabdev#686's first status update). - §6.7 batch reactions now explicitly sequential (not join_all parallel) so reaction-list ordering across a batch matches message-ID order. - Frontmatter: drop the self-referential "Supersedes: PR openabdev#598" line; add "Implementation PR: openabdev#686" so the ADR points at the wiring it documents.
…enabdev#686 head Five rounds of fact-check + proofread against PR openabdev#686 (feature/turn-boundary-batching-v2 @ e119abf) caught two threads of drift: - Design contract: §2.5 SendError handler now matches commit afd6fff — proactive consumer.is_finished() check at submit head + transparent retry once on SendError; ❌ +⚠️ + Err(ConsumerDead) only if the retry also fails. Motivated by the first-message-after-idle race; one-attempt bound preserves the no-spin-loop property. §6.11 staging smoke matrix split into Path A (PANIC_ONCE happy path, no user-visible signal) and Path B (PANIC_ALWAYS failing-retry surfaces ❌+⚠️ ). §4.4 Phase 1 plan + test list updated to the new contract. - Anchor audit vs declared base v0.8.2-beta.1 (52052b8): pre-existing drift fixed in adapter.rs references that had been wrong since the SHA pin was set in v0.2 — :131-152→:156-172, :138-143→:158-162 (7 sites), :148-152→:165-169, :154-161→:173-180, :181→:254 (was pointing at the wrong call), :240→:260. acp/connection.rs / acp/pool.rs / discord.rs / slack.rs anchors verified clean against 52052b8. - §2.6 rewritten: other_bot_present is a bool snapshot carried on BufferedMessage and read from batch.last() at dispatch time — not the Arc<AtomicBool> mirror of an earlier draft. §2.3 struct + submit signature corrected to match. - Anchor-pinning preamble (line 9) expanded to pin both SHAs explicitly: released-code anchors → 52052b8; conceptual descriptions of new modules → cross-checked against e119abf. - Appendix A replaced with a signatures-only skeleton pointing at src/dispatch.rs — drops the ~200-line body sketch that had drifted from the implementation; rationale moved into a short shape-choices list. - Path anchors swept: pool.rs → acp/pool.rs, connection.rs → acp/connection.rs (modules live under src/acp/ in v0.8.2-beta.1). - §6.6 metric table cell tokens_per_event (was context_tokens_per_event, inconsistent with the code block immediately below). Co-Authored-By: Claude Opus 4.7 <[email protected]>
Status update — review feedback addressed + ADR re-syncedFour-reviewer review (CHANGES REQUESTED) — disposition🔴 Suggested changes
🟡 Nits
ADR (PR #598) — what's now in syncPushed
Also picked up: §2.6 rewrite ( Remaining open items
Ready for re-review. |
This comment has been minimized.
This comment has been minimized.
…ts, fix ADR path - main.rs: collapse 3x repeated (cap, grouping, idle) match blocks into dispatch::dispatch_params(mode, max_buffered). - dispatch.rs: replace magic 4 / 512 in estimate_tokens with named CHARS_PER_TOKEN_ESTIMATE / TOKENS_PER_IMAGE_ESTIMATE constants. - dispatch.rs: fix top-level ADR reference to point at the actual docs/adr/turn-boundary-batching.md path landing in openabdev#598. Addresses chaodu-agent NITs openabdev#1, openabdev#2, openabdev#5 from PR openabdev#686. Co-Authored-By: Claude Opus 4.7 <[email protected]>
Status update — chaodu-agent NITs #1 / #2 / #5 addressedPushed
Not addressed (intentionally):
Verification
|
This comment has been minimized.
This comment has been minimized.
…nale - adapter.rs: note that future breaking changes should bump to v1.1+ - main.rs: explain why Arc<Mutex<Vec<Arc<Dispatcher>>>> is necessary (shared with cleanup task + shutdown; pushes at startup only) Addresses maintainer NITs from PR openabdev#686 review. Co-Authored-By: 超渡法師 <[email protected]>
四法師 Review — All findings addressed ✅All 🔴 and 🟡 items from the initial review have been resolved:
Verdict: Ready for maintainer approval. Architecture is sound, backward compatible by default, well-tested, and all review findings addressed. Reviewers
|
chaodu-agent
left a comment
There was a problem hiding this comment.
All four-monk review findings addressed. Architecture is sound, backward compatible, well-tested. LGTM ✅
…per-lane) Decision guide for operators choosing between the three modes, with config examples and trade-off explanations. Co-Authored-By: 超渡法師 <[email protected]>
Visual explanation of per-message vs per-thread vs per-lane behavior, plus the internal consumer_loop batching flow. Co-Authored-By: 超渡法師 <[email protected]>
Summary
Implements the turn-boundary message batching dispatcher per ADR v0.3 (
docs/adr/turn-boundary-batching.md).Changes
src/dispatch.rs(new):Dispatcher,BufferedMessage,ThreadHandle,consumer_loop,dispatch_batch,estimate_tokens, unit testssrc/config.rs:MessageProcessingModeenum,max_buffered_messages,max_batch_tokensconfig fields for Discord/Slack/Gatewaysrc/adapter.rs:AdapterRouter::pack_arrival_eventuniform packing (§3.3),ChannelRef/MessageReftypessrc/discord.rs: branch onmessage_processing_mode— per-message vs batched dispatch pathsrc/slack.rs: same branching;KeyedAsyncQueuereplaced byDispatcherconsumer tasksrc/gateway.rs: same branchingsrc/main.rs: wireDispatcherinstances per adapterADR compliance
Implements Phase 1 scope from ADR §4.4: I1 zero-latency first message, I2 at-most-one-in-flight-turn, I3 broker structural fidelity, SendError eviction (§2.5),
other_bot_presentfreshness (§2.6), batch reaction UX (§6.7), graceful shutdown (§6.8).Testing
Unit tests in
src/dispatch.rs:estimate_tokens,pack_arrival_eventsingle/batch/extra-blocks scenarios.https://discord.com/channels/1491295327620169908/1497977225314832536