Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "@t3tools/contracts";
import * as NodeServices from "@effect/platform-node/NodeServices";
import { assert, it } from "@effect/vitest";
import * as DateTime from "effect/DateTime";
import * as Effect from "effect/Effect";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
Expand All @@ -33,6 +34,7 @@ import {
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts";
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
import { ServerConfig } from "../../config.ts";

const makeProjectionPipelinePrefixedTestLayer = (prefix: string) =>
Expand Down Expand Up @@ -596,6 +598,285 @@ it.layer(
);
});

it.layer(
Layer.fresh(
OrchestrationProjectionSnapshotQueryLive.pipe(
Layer.provideMerge(
makeProjectionPipelinePrefixedTestLayer("t3-projection-streaming-append-"),
),
Layer.provideMerge(RepositoryIdentityResolverLive),
Layer.provideMerge(NodeServices.layer),
),
),
)("OrchestrationProjectionPipeline", (it) => {
it.effect("appends assistant streaming deltas in-place while preserving row metadata", () =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const sql = yield* SqlClient.SqlClient;
const first = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:01.000Z"));
const second = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:02.000Z"));
const third = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:03.000Z"));

const appendAndProject = (event: Parameters<typeof eventStore.append>[0]) =>
eventStore
.append(event)
.pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent)));

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-streaming-append-1"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-streaming-append"),
occurredAt: first,
commandId: CommandId.make("cmd-streaming-append-1"),
causationEventId: null,
correlationId: CommandId.make("cmd-streaming-append-1"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-streaming-append"),
messageId: MessageId.make("message-streaming-append"),
role: "assistant",
text: "Hello",
attachments: [
{
type: "image",
id: "thread-streaming-append-att-1",
name: "first.png",
mimeType: "image/png",
sizeBytes: 5,
},
],
turnId: TurnId.make("turn-streaming-append"),
streaming: true,
createdAt: first,
updatedAt: first,
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-streaming-append-2"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-streaming-append"),
occurredAt: second,
commandId: CommandId.make("cmd-streaming-append-2"),
causationEventId: null,
correlationId: CommandId.make("cmd-streaming-append-2"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-streaming-append"),
messageId: MessageId.make("message-streaming-append"),
role: "assistant",
text: " world",
turnId: TurnId.make("turn-streaming-append"),
streaming: true,
createdAt: second,
updatedAt: second,
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-streaming-append-3"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-streaming-append"),
occurredAt: third,
commandId: CommandId.make("cmd-streaming-append-3"),
causationEventId: null,
correlationId: CommandId.make("cmd-streaming-append-3"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-streaming-append"),
messageId: MessageId.make("message-streaming-append"),
role: "assistant",
text: "!",
attachments: [],
turnId: TurnId.make("turn-streaming-append"),
streaming: true,
createdAt: third,
updatedAt: third,
},
});

const rows = yield* sql<{
readonly text: string;
readonly attachmentsJson: string | null;
readonly createdAt: string;
readonly updatedAt: string;
readonly isStreaming: number;
}>`
SELECT
text,
attachments_json AS "attachmentsJson",
created_at AS "createdAt",
updated_at AS "updatedAt",
is_streaming AS "isStreaming"
FROM projection_thread_messages
WHERE message_id = 'message-streaming-append'
`;

assert.equal(rows.length, 1);
assert.deepEqual(rows[0], {
text: "Hello world!",
attachmentsJson: "[]",
createdAt: first,
updatedAt: third,
isStreaming: 1,
});
}),
);

it.effect("exposes appended assistant streaming text through thread detail snapshots", () =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const snapshotQuery = yield* ProjectionSnapshotQuery;
const eventStore = yield* OrchestrationEventStore;
const first = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:01.000Z"));
const second = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:02.000Z"));
const third = DateTime.formatIso(DateTime.makeUnsafe("2026-02-24T00:00:03.000Z"));

const appendAndProject = (event: Parameters<typeof eventStore.append>[0]) =>
eventStore
.append(event)
.pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent)));

yield* appendAndProject({
type: "project.created",
eventId: EventId.make("evt-client-visible-1"),
aggregateKind: "project",
aggregateId: ProjectId.make("project-client-visible"),
occurredAt: first,
commandId: CommandId.make("cmd-client-visible-1"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-client-visible-1"),
metadata: {},
payload: {
projectId: ProjectId.make("project-client-visible"),
title: "Project",
workspaceRoot: "/tmp/project-client-visible",
defaultModelSelection: null,
scripts: [],
createdAt: first,
updatedAt: first,
},
});

yield* appendAndProject({
type: "thread.created",
eventId: EventId.make("evt-client-visible-2"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-client-visible"),
occurredAt: first,
commandId: CommandId.make("cmd-client-visible-2"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-client-visible-2"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-client-visible"),
projectId: ProjectId.make("project-client-visible"),
title: "Thread",
modelSelection: {
instanceId: ProviderInstanceId.make("codex"),
model: "gpt-5-codex",
},
runtimeMode: "full-access",
interactionMode: "default",
branch: null,
worktreePath: null,
createdAt: first,
updatedAt: first,
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-client-visible-3"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-client-visible"),
occurredAt: first,
commandId: CommandId.make("cmd-client-visible-3"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-client-visible-3"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-client-visible"),
messageId: MessageId.make("message-client-visible"),
role: "assistant",
text: "Hello",
turnId: null,
streaming: true,
createdAt: first,
updatedAt: first,
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-client-visible-4"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-client-visible"),
occurredAt: second,
commandId: CommandId.make("cmd-client-visible-4"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-client-visible-4"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-client-visible"),
messageId: MessageId.make("message-client-visible"),
role: "assistant",
text: " world",
turnId: null,
streaming: true,
createdAt: second,
updatedAt: second,
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-client-visible-5"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-client-visible"),
occurredAt: third,
commandId: CommandId.make("cmd-client-visible-5"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-client-visible-5"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-client-visible"),
messageId: MessageId.make("message-client-visible"),
role: "assistant",
text: "",
turnId: null,
streaming: false,
createdAt: third,
updatedAt: third,
},
});

const detail = yield* snapshotQuery.getThreadDetailById(
ThreadId.make("thread-client-visible"),
);

assert.equal(detail._tag, "Some");
if (detail._tag === "Some") {
assert.deepEqual(detail.value.messages, [
{
id: MessageId.make("message-client-visible"),
role: "assistant",
text: "Hello world",
turnId: null,
streaming: false,
createdAt: first,
updatedAt: third,
},
]);
}
}),
);
});

it.layer(
Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-attachments-rollback-")),
)("OrchestrationProjectionPipeline", (it) => {
Expand Down
28 changes: 28 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,13 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
...existingRow.value,
updatedAt: event.occurredAt,
});
if (
event.type === "thread.message-sent" &&
event.payload.role === "assistant" &&
event.payload.streaming
) {
return;
}
yield* refreshThreadShellSummary(event.payload.threadId);
return;
}
Expand Down Expand Up @@ -785,6 +792,27 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
)(function* (event, attachmentSideEffects) {
switch (event.type) {
case "thread.message-sent": {
if (event.payload.role === "assistant" && event.payload.streaming) {
const nextAttachments =
event.payload.attachments !== undefined
? yield* materializeAttachmentsForProjection({
attachments: event.payload.attachments,
})
: undefined;
yield* projectionThreadMessageRepository.appendText({
messageId: event.payload.messageId,
threadId: event.payload.threadId,
turnId: event.payload.turnId,
role: event.payload.role,
textDelta: event.payload.text,
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
isStreaming: event.payload.streaming,
createdAt: event.payload.createdAt,
updatedAt: event.payload.updatedAt,
});
return;
}

const existingMessage = yield* projectionThreadMessageRepository.getByMessageId({
messageId: event.payload.messageId,
});
Expand Down
Loading
Loading