Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d83b481
feat(dispatch): turn-boundary batching dispatcher v2 per ADR v0.3
brettchien May 3, 2026
6b6ccf9
refactor(dispatch): cleanup naming, parallelize queued reactions, use…
brettchien May 3, 2026
37f95e4
feat(discord): add /cancel-all slash command
brettchien May 3, 2026
810e26d
refactor: unify PerMessage and Batched modes through Dispatcher
brettchien May 3, 2026
66fcf3f
chore(dispatch): address PR #686 NITs
brettchien May 3, 2026
81850c1
fix(dispatch): idle eviction, config validation, avoid clone, timesta…
May 3, 2026
afd6fff
fix(dispatch): proactive stale-entry cleanup + transparent retry on i…
May 3, 2026
472d6d9
fix(dispatch): periodic sweep of stale per-thread entries
May 3, 2026
e0dd269
feat(dispatch): add per-lane batching mode (default for "batched" alias)
brettchien May 4, 2026
a6fb806
fix(dispatch): /reset and /cancel-all clear all lanes in thread
brettchien May 4, 2026
2d2ce95
feat(config)!: drop "batched" alias, only per-message/per-thread/per-…
brettchien May 4, 2026
26570c4
fix(dispatch): restore shared thread sessions and abort consumers on …
shaun-agent May 4, 2026
f7bd044
feat(chart): expose message_processing_mode and batching params
brettchien May 4, 2026
436587c
refactor: rename enum variants to drop redundant Per prefix
brettchien May 4, 2026
991f71f
feat(config): validate max_batch_tokens > 0
brettchien May 4, 2026
5e413a2
test(dispatch): cover sweep_stale and shutdown
brettchien May 4, 2026
98c2086
test(dispatch): cover consumer_loop via DispatchTarget trait seam
brettchien May 4, 2026
1dc0aae
fix(adapter): make SenderContext.timestamp truly additive
brettchien May 5, 2026
138f648
docs(dispatch): note re-acquire-after-await safety in submit
brettchien May 5, 2026
072010c
fix(adapter): use sender_context as standalone delimiter, split promp…
brettchien May 5, 2026
0f46b1e
Merge remote-tracking branch 'upstream/main' into feature/turn-bounda…
brettchien May 5, 2026
26c7bf8
fix(gateway): import AdapterRouter so handle_config_command compiles
brettchien May 5, 2026
0f339e2
fix(timestamp): parse Slack ts as f64 to preserve decimal semantics
brettchien May 5, 2026
956461b
refactor(discord): drop approximate count from /cancel-all message
brettchien May 5, 2026
3f19d48
docs(dispatch): annotate per_thread mutex lock sites with SAFETY comm…
brettchien May 5, 2026
93765a1
refactor(dispatch): apply queued reactions sequentially
brettchien May 5, 2026
963b96c
refactor(dispatch): per-mode consumer idle timeout (10s for per-message)
brettchien May 5, 2026
e119abf
Merge remote-tracking branch 'upstream/main' into feature/turn-bounda…
fgalassi May 5, 2026
aea6782
refactor(dispatch): extract dispatch_params, name token-estimate cons…
brettchien May 5, 2026
1bc3c0c
docs: clarify schema evolution comment + dispatchers triple-Arc ratio…
chaodu-agent May 5, 2026
73f4cc5
docs: add message dispatch modes guide (per-message vs per-thread vs …
chaodu-agent May 5, 2026
beb5493
docs(dispatch): add ASCII diagrams for all three modes + consumer loop
chaodu-agent May 5, 2026
b078046
docs: clarify per-message is the default behavior
chaodu-agent May 5, 2026
c324989
docs(dispatch): add explicit pros/cons and comparison table for each …
chaodu-agent May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions charts/openab/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ data:
{{- if hasKey ($cfg.discord | default dict) "maxBotTurns" }}
max_bot_turns = {{ ($cfg.discord).maxBotTurns | int }}
{{- end }}
{{- /* messageProcessingMode: per-message (default) | per-thread | per-lane (turn-boundary batching) */ -}}
{{- if hasKey ($cfg.discord | default dict) "messageProcessingMode" }}
{{- if not (has ($cfg.discord).messageProcessingMode (list "per-message" "per-thread" "per-lane")) }}
{{- fail (printf "agents.%s.discord.messageProcessingMode must be one of: per-message, per-thread, per-lane — got: %s" $name ($cfg.discord).messageProcessingMode) }}
{{- end }}
message_processing_mode = {{ ($cfg.discord).messageProcessingMode | toJson }}
{{- end }}
{{- if hasKey ($cfg.discord | default dict) "maxBufferedMessages" }}
max_buffered_messages = {{ ($cfg.discord).maxBufferedMessages | int }}
{{- end }}
{{- if hasKey ($cfg.discord | default dict) "maxBatchTokens" }}
max_batch_tokens = {{ ($cfg.discord).maxBatchTokens | int }}
{{- end }}
{{- end }}

{{- if and ($cfg.slack).enabled }}
Expand Down Expand Up @@ -97,6 +110,19 @@ data:
{{- if hasKey ($cfg.slack | default dict) "maxBotTurns" }}
max_bot_turns = {{ ($cfg.slack).maxBotTurns | int }}
{{- end }}
{{- /* messageProcessingMode: per-message (default) | per-thread | per-lane (turn-boundary batching) */ -}}
{{- if hasKey ($cfg.slack | default dict) "messageProcessingMode" }}
{{- if not (has ($cfg.slack).messageProcessingMode (list "per-message" "per-thread" "per-lane")) }}
{{- fail (printf "agents.%s.slack.messageProcessingMode must be one of: per-message, per-thread, per-lane — got: %s" $name ($cfg.slack).messageProcessingMode) }}
{{- end }}
message_processing_mode = {{ ($cfg.slack).messageProcessingMode | toJson }}
{{- end }}
{{- if hasKey ($cfg.slack | default dict) "maxBufferedMessages" }}
max_buffered_messages = {{ ($cfg.slack).maxBufferedMessages | int }}
{{- end }}
{{- if hasKey ($cfg.slack | default dict) "maxBatchTokens" }}
max_batch_tokens = {{ ($cfg.slack).maxBatchTokens | int }}
{{- end }}
{{- end }}

[agent]
Expand Down Expand Up @@ -170,6 +196,19 @@ data:
{{- end }}
{{- end }}
allowed_users = {{ ($cfg.gateway).allowedUsers | default list | toJson }}
{{- /* messageProcessingMode: per-message (default) | per-thread | per-lane (turn-boundary batching) */ -}}
{{- if hasKey ($cfg.gateway | default dict) "messageProcessingMode" }}
{{- if not (has ($cfg.gateway).messageProcessingMode (list "per-message" "per-thread" "per-lane")) }}
{{- fail (printf "agents.%s.gateway.messageProcessingMode must be one of: per-message, per-thread, per-lane — got: %s" $name ($cfg.gateway).messageProcessingMode) }}
{{- end }}
message_processing_mode = {{ ($cfg.gateway).messageProcessingMode | toJson }}
{{- end }}
{{- if hasKey ($cfg.gateway | default dict) "maxBufferedMessages" }}
max_buffered_messages = {{ ($cfg.gateway).maxBufferedMessages | int }}
{{- end }}
{{- if hasKey ($cfg.gateway | default dict) "maxBatchTokens" }}
max_batch_tokens = {{ ($cfg.gateway).maxBatchTokens | int }}
{{- end }}
{{- end }}
{{- if or ($cfg.cronjobs) (($cfg.cron).usercronEnabled) (($cfg.cron).usercronPath) }}

Expand Down
124 changes: 124 additions & 0 deletions charts/openab/tests/message-processing-mode_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
suite: messageProcessingMode & batching params
templates:
- templates/configmap.yaml
tests:
- it: discord renders message_processing_mode = "per-lane"
set:
agents.kiro.discord.messageProcessingMode: per-lane
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'message_processing_mode = "per-lane"'

- it: discord renders message_processing_mode = "per-thread"
set:
agents.kiro.discord.messageProcessingMode: per-thread
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'message_processing_mode = "per-thread"'

- it: discord renders message_processing_mode = "per-message"
set:
agents.kiro.discord.messageProcessingMode: per-message
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'message_processing_mode = "per-message"'

- it: discord rejects invalid messageProcessingMode value
set:
agents.kiro.discord.messageProcessingMode: batched
asserts:
- failedTemplate:
errorPattern: "must be one of: per-message, per-thread, per-lane"

- it: discord omits message_processing_mode when not set
asserts:
- notMatchRegex:
path: data["config.toml"]
pattern: 'message_processing_mode'

- it: discord renders maxBufferedMessages and maxBatchTokens
set:
agents.kiro.discord.messageProcessingMode: per-lane
agents.kiro.discord.maxBufferedMessages: 25
agents.kiro.discord.maxBatchTokens: 32000
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'max_buffered_messages = 25'
- matchRegex:
path: data["config.toml"]
pattern: 'max_batch_tokens = 32000'

- it: slack renders message_processing_mode = "per-thread"
set:
agents.kiro.slack.enabled: true
agents.kiro.slack.botToken: xoxb-x
agents.kiro.slack.appToken: xapp-x
agents.kiro.slack.messageProcessingMode: per-thread
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'message_processing_mode = "per-thread"'

- it: slack rejects invalid messageProcessingMode value
set:
agents.kiro.slack.enabled: true
agents.kiro.slack.botToken: xoxb-x
agents.kiro.slack.appToken: xapp-x
agents.kiro.slack.messageProcessingMode: batched
asserts:
- failedTemplate:
errorPattern: "must be one of: per-message, per-thread, per-lane"

- it: slack renders maxBufferedMessages and maxBatchTokens
set:
agents.kiro.slack.enabled: true
agents.kiro.slack.botToken: xoxb-x
agents.kiro.slack.appToken: xapp-x
agents.kiro.slack.messageProcessingMode: per-lane
agents.kiro.slack.maxBufferedMessages: 15
agents.kiro.slack.maxBatchTokens: 18000
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'max_buffered_messages = 15'
- matchRegex:
path: data["config.toml"]
pattern: 'max_batch_tokens = 18000'

- it: gateway renders message_processing_mode = "per-lane"
set:
agents.kiro.gateway.enabled: true
agents.kiro.gateway.url: ws://openab-gateway:8080/ws
agents.kiro.gateway.messageProcessingMode: per-lane
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'message_processing_mode = "per-lane"'

- it: gateway rejects invalid messageProcessingMode value
set:
agents.kiro.gateway.enabled: true
agents.kiro.gateway.url: ws://openab-gateway:8080/ws
agents.kiro.gateway.messageProcessingMode: batched
asserts:
- failedTemplate:
errorPattern: "must be one of: per-message, per-thread, per-lane"

- it: gateway renders maxBufferedMessages and maxBatchTokens
set:
agents.kiro.gateway.enabled: true
agents.kiro.gateway.url: ws://openab-gateway:8080/ws
agents.kiro.gateway.messageProcessingMode: per-thread
agents.kiro.gateway.maxBufferedMessages: 50
agents.kiro.gateway.maxBatchTokens: 12000
asserts:
- matchRegex:
path: data["config.toml"]
pattern: 'max_buffered_messages = 50'
- matchRegex:
path: data["config.toml"]
pattern: 'max_batch_tokens = 12000'
24 changes: 24 additions & 0 deletions charts/openab/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ agents:
# multi-agent collaborations; lower to throttle runaway loops more
# aggressively. Hard cap remains 100 regardless (compiled-in).
# maxBotTurns: 20
# messageProcessingMode: "per-message" (default) | "per-thread" | "per-lane"
# per-thread: all senders in a thread share one batch → one ACP turn per turn boundary
# per-lane: each (thread, sender) batches independently → no silent-drop risk
# messageProcessingMode: "per-lane"
# maxBufferedMessages: per-thread mpsc capacity for batching modes (default 10)
# maxBufferedMessages: 10
# maxBatchTokens: soft token cap per ACP turn for batching modes (default 24000)
# maxBatchTokens: 24000
slack:
enabled: false
botToken: "" # Bot User OAuth Token (xoxb-...)
Expand All @@ -185,6 +193,14 @@ agents:
# multi-agent collaborations; lower to throttle runaway loops more
# aggressively. Hard cap remains 100 regardless (compiled-in).
# maxBotTurns: 20
# messageProcessingMode: "per-message" (default) | "per-thread" | "per-lane"
# per-thread: all senders in a thread share one batch → one ACP turn per turn boundary
# per-lane: each (thread, sender) batches independently → no silent-drop risk
# messageProcessingMode: "per-lane"
# maxBufferedMessages: per-thread mpsc capacity for batching modes (default 10)
# maxBufferedMessages: 10
# maxBatchTokens: soft token cap per ACP turn for batching modes (default 24000)
# maxBatchTokens: 24000
workingDir: /home/agent
env: {}
envFrom: []
Expand All @@ -211,6 +227,14 @@ agents:
platform: "telegram" # default platform when gateway is enabled
token: "" # optional shared secret (injected via GATEWAY_WS_TOKEN env var)
botUsername: "" # optional, for @mention gating
# messageProcessingMode: "per-message" (default) | "per-thread" | "per-lane"
# per-thread: all senders in a thread share one batch → one ACP turn per turn boundary
# per-lane: each (thread, sender) batches independently → no silent-drop risk
# messageProcessingMode: "per-lane"
# maxBufferedMessages: per-thread mpsc capacity for batching modes (default 10)
# maxBufferedMessages: 10
# maxBatchTokens: soft token cap per ACP turn for batching modes (default 24000)
# maxBatchTokens: 24000
image: "ghcr.io/openabdev/openab-gateway" # gateway container image
tag: "" # defaults to Chart.AppVersion
strategy: "Recreate" # Recreate (default, prevents concurrent WS conflicts) or RollingUpdate
Expand Down
Loading
Loading