diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 369eea0f7a..cb93b8d6ab 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -11,6 +11,7 @@ import { } from "@t3tools/contracts"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { assert, it } from "@effect/vitest"; +import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; @@ -33,6 +34,7 @@ import { import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { ServerConfig } from "../../config.ts"; const makeProjectionPipelinePrefixedTestLayer = (prefix: string) => @@ -596,6 +598,285 @@ it.layer( ); }); +it.layer( + Layer.fresh( + OrchestrationProjectionSnapshotQueryLive.pipe( + Layer.provideMerge( + makeProjectionPipelinePrefixedTestLayer("t3-projection-streaming-append-"), + ), + Layer.provideMerge(RepositoryIdentityResolverLive), + Layer.provideMerge(NodeServices.layer), + ), + ), +)("OrchestrationProjectionPipeline", (it) => { + it.effect("appends assistant streaming deltas in-place while preserving row metadata", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const first = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:01.000Z")); + const second = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:02.000Z")); + const third = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:03.000Z")); + + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-streaming-append-1"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-streaming-append"), + occurredAt: first, + commandId: CommandId.make("cmd-streaming-append-1"), + causationEventId: null, + correlationId: CommandId.make("cmd-streaming-append-1"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-streaming-append"), + messageId: MessageId.make("message-streaming-append"), + role: "assistant", + text: "Hello", + attachments: [ + { + type: "image", + id: "thread-streaming-append-att-1", + name: "first.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ], + turnId: TurnId.make("turn-streaming-append"), + streaming: true, + createdAt: first, + updatedAt: first, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-streaming-append-2"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-streaming-append"), + occurredAt: second, + commandId: CommandId.make("cmd-streaming-append-2"), + causationEventId: null, + correlationId: CommandId.make("cmd-streaming-append-2"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-streaming-append"), + messageId: MessageId.make("message-streaming-append"), + role: "assistant", + text: " world", + turnId: TurnId.make("turn-streaming-append"), + streaming: true, + createdAt: second, + updatedAt: second, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-streaming-append-3"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-streaming-append"), + occurredAt: third, + commandId: CommandId.make("cmd-streaming-append-3"), + causationEventId: null, + correlationId: CommandId.make("cmd-streaming-append-3"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-streaming-append"), + messageId: MessageId.make("message-streaming-append"), + role: "assistant", + text: "!", + attachments: [], + turnId: TurnId.make("turn-streaming-append"), + streaming: true, + createdAt: third, + updatedAt: third, + }, + }); + + const rows = yield* sql<{ + readonly text: string; + readonly attachmentsJson: string | null; + readonly createdAt: string; + readonly updatedAt: string; + readonly isStreaming: number; + }>` + SELECT + text, + attachments_json AS "attachmentsJson", + created_at AS "createdAt", + updated_at AS "updatedAt", + is_streaming AS "isStreaming" + FROM projection_thread_messages + WHERE message_id = 'message-streaming-append' + `; + + assert.equal(rows.length, 1); + assert.deepEqual(rows[0], { + text: "Hello world!", + attachmentsJson: "[]", + createdAt: first, + updatedAt: third, + isStreaming: 1, + }); + }), + ); + + it.effect("exposes appended assistant streaming text through thread detail snapshots", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const snapshotQuery = yield* ProjectionSnapshotQuery; + const eventStore = yield* OrchestrationEventStore; + const first = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:01.000Z")); + const second = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:02.000Z")); + const third = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:03.000Z")); + + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.make("evt-client-visible-1"), + aggregateKind: "project", + aggregateId: ProjectId.make("project-client-visible"), + occurredAt: first, + commandId: CommandId.make("cmd-client-visible-1"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-client-visible-1"), + metadata: {}, + payload: { + projectId: ProjectId.make("project-client-visible"), + title: "Project", + workspaceRoot: "/tmp/project-client-visible", + defaultModelSelection: null, + scripts: [], + createdAt: first, + updatedAt: first, + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.make("evt-client-visible-2"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-client-visible"), + occurredAt: first, + commandId: CommandId.make("cmd-client-visible-2"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-client-visible-2"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-client-visible"), + projectId: ProjectId.make("project-client-visible"), + title: "Thread", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt: first, + updatedAt: first, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-client-visible-3"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-client-visible"), + occurredAt: first, + commandId: CommandId.make("cmd-client-visible-3"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-client-visible-3"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-client-visible"), + messageId: MessageId.make("message-client-visible"), + role: "assistant", + text: "Hello", + turnId: null, + streaming: true, + createdAt: first, + updatedAt: first, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-client-visible-4"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-client-visible"), + occurredAt: second, + commandId: CommandId.make("cmd-client-visible-4"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-client-visible-4"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-client-visible"), + messageId: MessageId.make("message-client-visible"), + role: "assistant", + text: " world", + turnId: null, + streaming: true, + createdAt: second, + updatedAt: second, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-client-visible-5"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-client-visible"), + occurredAt: third, + commandId: CommandId.make("cmd-client-visible-5"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-client-visible-5"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-client-visible"), + messageId: MessageId.make("message-client-visible"), + role: "assistant", + text: "", + turnId: null, + streaming: false, + createdAt: third, + updatedAt: third, + }, + }); + + const detail = yield* snapshotQuery.getThreadDetailById( + ThreadId.make("thread-client-visible"), + ); + + assert.equal(detail._tag, "Some"); + if (detail._tag === "Some") { + assert.deepEqual(detail.value.messages, [ + { + id: MessageId.make("message-client-visible"), + role: "assistant", + text: "Hello world", + turnId: null, + streaming: false, + createdAt: first, + updatedAt: third, + }, + ]); + } + }), + ); +}); + it.layer( Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-attachments-rollback-")), )("OrchestrationProjectionPipeline", (it) => { diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 1161ff6a7d..0cb3ac7642 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -701,6 +701,13 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti ...existingRow.value, updatedAt: event.occurredAt, }); + if ( + event.type === "thread.message-sent" && + event.payload.role === "assistant" && + event.payload.streaming + ) { + return; + } yield* refreshThreadShellSummary(event.payload.threadId); return; } @@ -785,6 +792,27 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti )(function* (event, attachmentSideEffects) { switch (event.type) { case "thread.message-sent": { + if (event.payload.role === "assistant" && event.payload.streaming) { + const nextAttachments = + event.payload.attachments !== undefined + ? yield* materializeAttachmentsForProjection({ + attachments: event.payload.attachments, + }) + : undefined; + yield* projectionThreadMessageRepository.appendText({ + messageId: event.payload.messageId, + threadId: event.payload.threadId, + turnId: event.payload.turnId, + role: event.payload.role, + textDelta: event.payload.text, + ...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}), + isStreaming: event.payload.streaming, + createdAt: event.payload.createdAt, + updatedAt: event.payload.updatedAt, + }); + return; + } + const existingMessage = yield* projectionThreadMessageRepository.getByMessageId({ messageId: event.payload.messageId, }); diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts index 7191916688..2927148aa0 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts @@ -7,8 +7,9 @@ import * as Schema from "effect/Schema"; import * as Struct from "effect/Struct"; import { ChatAttachment } from "@t3tools/contracts"; -import { toPersistenceSqlError } from "../Errors.ts"; +import { toPersistenceDecodeError, toPersistenceSqlError } from "../Errors.ts"; import { + AppendProjectionThreadMessageTextInput, GetProjectionThreadMessageInput, ProjectionThreadMessageRepository, type ProjectionThreadMessageRepositoryShape, @@ -24,6 +25,24 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( }), ); +const ProjectionThreadMessageDbInput = ProjectionThreadMessage.mapFields( + Struct.assign({ + isStreaming: Schema.BooleanFromBit, + attachments: Schema.optional(Schema.fromJsonString(Schema.Array(ChatAttachment))), + }), +); + +const AppendProjectionThreadMessageTextDbInput = AppendProjectionThreadMessageTextInput.mapFields( + Struct.assign({ + isStreaming: Schema.BooleanFromBit, + attachments: Schema.optional(Schema.fromJsonString(Schema.Array(ChatAttachment))), + }), +); +const encodeProjectionThreadMessageDbInput = Schema.encodeEffect(ProjectionThreadMessageDbInput); +const encodeAppendProjectionThreadMessageTextDbInput = Schema.encodeEffect( + AppendProjectionThreadMessageTextDbInput, +); + function toProjectionThreadMessage( row: Schema.Schema.Type, ): ProjectionThreadMessage { @@ -44,11 +63,8 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; const upsertProjectionThreadMessageRow = SqlSchema.void({ - Request: ProjectionThreadMessage, - execute: (row) => { - const nextAttachmentsJson = - row.attachments !== undefined ? JSON.stringify(row.attachments) : null; - return sql` + Request: Schema.toEncoded(ProjectionThreadMessageDbInput), + execute: (row) => sql` INSERT INTO projection_thread_messages ( message_id, thread_id, @@ -67,14 +83,14 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { ${row.role}, ${row.text}, COALESCE( - ${nextAttachmentsJson}, + ${row.attachments ?? null}, ( SELECT attachments_json FROM projection_thread_messages WHERE message_id = ${row.messageId} ) ), - ${row.isStreaming ? 1 : 0}, + ${row.isStreaming}, ${row.createdAt}, ${row.updatedAt} ) @@ -91,8 +107,47 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { is_streaming = excluded.is_streaming, created_at = excluded.created_at, updated_at = excluded.updated_at - `; - }, + `, + }); + + const appendProjectionThreadMessageText = SqlSchema.void({ + Request: Schema.toEncoded(AppendProjectionThreadMessageTextDbInput), + execute: (row) => sql` + INSERT INTO projection_thread_messages ( + message_id, + thread_id, + turn_id, + role, + text, + attachments_json, + is_streaming, + created_at, + updated_at + ) + VALUES ( + ${row.messageId}, + ${row.threadId}, + ${row.turnId}, + ${row.role}, + ${row.textDelta}, + ${row.attachments ?? null}, + ${row.isStreaming}, + ${row.createdAt}, + ${row.updatedAt} + ) + ON CONFLICT (message_id) + DO UPDATE SET + thread_id = excluded.thread_id, + turn_id = excluded.turn_id, + role = excluded.role, + text = projection_thread_messages.text || excluded.text, + attachments_json = COALESCE( + excluded.attachments_json, + projection_thread_messages.attachments_json + ), + is_streaming = excluded.is_streaming, + updated_at = excluded.updated_at + `, }); const getProjectionThreadMessageRow = SqlSchema.findOneOption({ @@ -147,8 +202,27 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { }); const upsert: ProjectionThreadMessageRepositoryShape["upsert"] = (row) => - upsertProjectionThreadMessageRow(row).pipe( - Effect.mapError(toPersistenceSqlError("ProjectionThreadMessageRepository.upsert:query")), + encodeProjectionThreadMessageDbInput(row).pipe( + Effect.mapError(toPersistenceDecodeError("ProjectionThreadMessageRepository.upsert:encode")), + Effect.flatMap((encodedRow) => + upsertProjectionThreadMessageRow(encodedRow).pipe( + Effect.mapError(toPersistenceSqlError("ProjectionThreadMessageRepository.upsert:query")), + ), + ), + ); + + const appendText: ProjectionThreadMessageRepositoryShape["appendText"] = (input) => + encodeAppendProjectionThreadMessageTextDbInput(input).pipe( + Effect.mapError( + toPersistenceDecodeError("ProjectionThreadMessageRepository.appendText:encode"), + ), + Effect.flatMap((encodedInput) => + appendProjectionThreadMessageText(encodedInput).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadMessageRepository.appendText:query"), + ), + ), + ), ); const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) => @@ -176,6 +250,7 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { return { upsert, + appendText, getByMessageId, listByThreadId, deleteByThreadId, diff --git a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts index d50ff32025..f1a1bf93ee 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts @@ -49,6 +49,20 @@ export const DeleteProjectionThreadMessagesInput = Schema.Struct({ }); export type DeleteProjectionThreadMessagesInput = typeof DeleteProjectionThreadMessagesInput.Type; +export const AppendProjectionThreadMessageTextInput = Schema.Struct({ + messageId: MessageId, + threadId: ThreadId, + turnId: Schema.NullOr(TurnId), + role: OrchestrationMessageRole, + textDelta: Schema.String, + attachments: Schema.optional(Schema.Array(ChatAttachment)), + isStreaming: Schema.Boolean, + createdAt: IsoDateTime, + updatedAt: IsoDateTime, +}); +export type AppendProjectionThreadMessageTextInput = + typeof AppendProjectionThreadMessageTextInput.Type; + /** * ProjectionThreadMessageRepositoryShape - Service API for projected thread messages. */ @@ -62,6 +76,16 @@ export interface ProjectionThreadMessageRepositoryShape { message: ProjectionThreadMessage, ) => Effect.Effect; + /** + * Insert a projected message or append a streaming text delta in-place. + * + * Upserts by `messageId`. Existing attachments are preserved unless the + * append carries replacement attachments. + */ + readonly appendText: ( + input: AppendProjectionThreadMessageTextInput, + ) => Effect.Effect; + /** * Read a projected thread message by id. */