Rewrite notifications handling on the client using per-client channel buffer#75
Rewrite notifications handling on the client using per-client channel buffer#75nerzhulart merged 11 commits intomasterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR reworks client-side session notification handling to prevent deadlocks when session/* responses are preceded by session/update notifications (e.g., during session/load replay), by buffering session updates per session until the session is initialized.
Changes:
- Introduces a per-session holder with a notification buffer to queue
session/updateevents until session creation completes. - Updates
Clientsession lookup/initialization flow to use the new holder abstraction. - Adds regression tests covering “notifications before response” for both
session/loadandsession/new, plus slow-agent/slow-client scenarios.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| build.gradle.kts | Bumps library version to 0.16.2. |
| acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt | Implements per-session buffering via ClientSessionHolder and adjusts session resolution logic. |
| acp-ktor-test/src/commonTest/kotlin/com/agentclientprotocol/SimpleAgentTest.kt | Adds tests ensuring updates arriving before responses are delivered and not lost under timing variance. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Outdated
Show resolved
Hide resolved
acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Outdated
Show resolved
Hide resolved
acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Outdated
Show resolved
Hide resolved
…t.kt Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| suspend fun drainEventsAndCompleteSession(session: ClientSessionImpl) { | ||
| @OptIn(ExperimentalCoroutinesApi::class) | ||
| notifications.close() | ||
| for ((notification, meta) in notifications) { | ||
| session.executeWithSession { | ||
| session.handleNotification(notification, meta) | ||
| } | ||
| } |
There was a problem hiding this comment.
drainEventsAndCompleteSession() closes the notifications channel before draining. While draining runs, any new SessionUpdate will hit the trySend failure path and the notification handler will suspend, blocking Protocol.start()'s read loop (notifications are handled inline). This can reintroduce message-processing stalls/deadlocks under load. Consider keeping the notification handler strictly non-suspending (always enqueue) and draining/processing from a dedicated consumer coroutine per session, or otherwise avoid closing the channel while new updates may still arrive.
| // Don't make the channel limited, because it leads to a deadlock also: | ||
| // when client side makes loadSession/newSession and an agent sends updates more than channel.capacity | ||
| // the message with call response suspends because protocol thread is suspended in handleNotification | ||
| // if to address it we have to somehow reorder events, that's not obvious on the protocol level, so we pay with memory right now to handle it | ||
| private val notifications = Channel<Pair<SessionUpdate, JsonElement?>>(capacity = Channel.UNLIMITED) | ||
|
|
There was a problem hiding this comment.
Channel.UNLIMITED for queued session updates can lead to unbounded memory growth if an agent floods session/update (or if draining is slow). If this is intentional, consider at least adding a configurable cap and a defined overflow strategy (drop/close session/backpressure) to avoid OOM in production.
| }.getOrElse { throwable -> | ||
| sessionHolder.completeExceptionally(IllegalStateException("Failed to create session $sessionId", throwable)) | ||
| removeSessionHolder(sessionId) | ||
| throw throwable |
There was a problem hiding this comment.
In createSession() error handling, the holder is completed exceptionally with a wrapped IllegalStateException("Failed to create session ..."), but the method rethrows the original throwable. This means callers of loadSession/newSession and code awaiting getSessionOrThrow() will observe different exception types/messages for the same failure, making debugging inconsistent. Consider throwing the same wrapped exception (or not wrapping at all) so both paths surface identical failures.
acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Outdated
Show resolved
Hide resolved
acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Outdated
Show resolved
Hide resolved
| return clientSessionHolder ?: acpFail("Session $sessionId not found") | ||
| } | ||
|
|
||
| private fun removeSessionHolder(sessionId: SessionId) { |
| for ((id, holder) in hangingSessions) { | ||
| logger.trace { "Removing hanging session $id" } | ||
| // report it as non existent session | ||
| holder.completeExceptionally(AcpExpectedError("Session $id not found")) |
There was a problem hiding this comment.
nitpick: for failed to initialize sessions we will completeExceptionally twice. -- could check if the session already completed
Reworked notifications handling to avoid blocking notifications handler in the case when session is being initialized.]
See issue #74 and https://youtrack.jetbrains.com/issue/AIAE-76/Session-load-does-not-send-messages-to-the-actual-client#focus=Comments-27-13428158.0-0
Instead of suspending until a newly created session response is received (that leads to deadlock) we store session related notifications to a per-session channel queue (even if the session-id is not yet here) and unsuspend
setNotificationHandler(AcpMethod.ClientMethods.SessionUpdate) {}lambda immediately. Then, after a session load/new/fork/resume response is received we indicate that the session is initialized (seeholder.completeSession) and drain the corresponding update channel