Skip to content

Commit 02690d4

Browse files
authored
Use Redis Streams for active runs (#1778)
1 parent 85b5d2c commit 02690d4

File tree

82 files changed

+1138
-777
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1138
-777
lines changed

.tmuxinator.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ windows:
1010
panes:
1111
- pnpm build --filter='./packages/**'
1212
- cd apps/web && pnpm workers:build
13-
- docker: docker compose up db redis mailpit --menu=false
13+
- docker: docker compose up db redis redisinsight mailpit --menu=false
1414
- studio: cd packages/core && pnpm db:studio

AGENTS.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ When implementing new features, follow this order:
231231
6. **Stores**: Create SWR hooks with optimistic updates
232232
7. **UI Components**: Build interface following design patterns
233233
8. **Routes**: Add navigation and routing
234-
9. **Testing**: Validate with linting and type checking
235-
10. **Code Formatting**: **ALWAYS run `pnpm prettier`** to standardize patterns
236234

237235
## Action Organization Patterns
238236

apps/gateway/src/routes/api/v3/conversations/attach/attach.handler.ts

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { AttachRoute } from './attach.route'
1515
// @ts-expect-error: streamSSE has type issues with zod-openapi
1616
export const attachHandler: AppRouteHandler<AttachRoute> = async (ctx) => {
1717
const { conversationUuid } = ctx.req.valid('param')
18-
const { stream, interactive } = ctx.req.valid('json')
18+
const { stream } = ctx.req.valid('json')
1919
const workspace = ctx.get('workspace')
2020

2121
const run = await unsafelyFindActiveRun(conversationUuid).then((r) => r.unwrap()) // prettier-ignore
@@ -31,33 +31,27 @@ export const attachHandler: AppRouteHandler<AttachRoute> = async (ctx) => {
3131
.then((r) => r.unwrap())
3232

3333
const args = { run, project, workspace }
34-
if (stream) return handleStreamingMode(ctx, args, interactive)
35-
return await handleNonStreamingMode(ctx, args, interactive)
34+
if (stream) return handleStreamingMode(ctx, args)
35+
return await handleNonStreamingMode(ctx, args)
3636
}
3737

3838
async function handleStreamingMode(
3939
ctx: Parameters<AppRouteHandler<AttachRoute>>[0],
4040
args: Parameters<typeof attachRun>[0],
41-
interactive: boolean,
4241
) {
4342
return streamSSE(
4443
ctx,
4544
async (stream) => {
46-
let abortSignal: AbortSignal | undefined
47-
if (interactive) {
48-
const abortController = new AbortController()
49-
stream.onAbort(() => {
50-
abortController.abort()
51-
stream.close()
52-
})
53-
abortSignal = abortController.signal
54-
} else {
55-
stream.onAbort(() => stream.close())
56-
}
45+
const abortController = new AbortController()
46+
stream.onAbort(() => {
47+
abortController.abort()
48+
stream.close()
49+
})
50+
const abortSignal = abortController.signal
5751

5852
try {
5953
let id = 0
60-
const result = await attachRun({
54+
await attachRun({
6155
...args,
6256
abortSignal: abortSignal,
6357
onEvent: ({ event, data }) => {
@@ -68,26 +62,17 @@ async function handleStreamingMode(
6862
})
6963
},
7064
}).then((r) => r.unwrap())
71-
72-
// Wait for stream to finish
73-
const error = await result.error
74-
if (error) throw error
7565
} catch (error) {
7666
// Handle abort errors gracefully - don't log them as actual errors
77-
if (isAbortError(error)) {
78-
// Client disconnected, close stream quietly
79-
return
80-
}
67+
if (isAbortError(error)) return
8168

8269
// Re-throw other errors to be handled by the error callback
8370
throw error
8471
}
8572
},
8673
(error: Error) => {
87-
// Don't log abort errors as they are expected when clients disconnect
88-
if (isAbortError(error)) {
89-
return Promise.resolve()
90-
}
74+
// Handle abort errors gracefully - don't log them as actual errors
75+
if (isAbortError(error)) return Promise.resolve()
9176

9277
const unknownError = getUnknownError(error)
9378
if (unknownError) captureException(error)
@@ -100,21 +85,15 @@ async function handleStreamingMode(
10085
async function handleNonStreamingMode(
10186
ctx: Parameters<AppRouteHandler<AttachRoute>>[0],
10287
args: Parameters<typeof attachRun>[0],
103-
interactive: boolean,
10488
) {
105-
let abortSignal: AbortSignal | undefined
106-
if (interactive) {
107-
abortSignal = ctx.req.raw.signal // FIXME: this is not working
108-
}
109-
89+
const abortSignal = ctx.req.raw.signal // FIXME: this is not working
11090
const result = await attachRun({ ...args, abortSignal }).then((r) => r.unwrap()) // prettier-ignore
11191

11292
// Wait for stream to finish
113-
const error = await result.error
93+
const error = result.error
11494
if (error) throw error
11595

116-
const response = (await result.lastResponse)!
117-
const body = runPresenter({ response }).unwrap()
96+
const body = runPresenter({ response: result.lastResponse! }).unwrap()
11897

11998
return ctx.json(body)
12099
}

apps/gateway/src/routes/api/v3/conversations/attach/attach.route.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ export const attachRoute = createRoute({
2121
[http.MediaTypes.JSON]: {
2222
schema: z.object({
2323
stream: z.boolean().default(false),
24-
interactive: z.boolean().default(false),
2524
}),
2625
},
2726
},

apps/gateway/src/routes/api/v3/conversations/chat/chat.handler.ts

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ import { runPresenter } from '$/presenters/runPresenter'
44
import { ChatRoute } from '$/routes/api/v3/conversations/chat/chat.route'
55
import { Message as LegacyMessage } from '@latitude-data/constants/legacyCompiler'
66
import { getUnknownError } from '@latitude-data/core/lib/getUnknownError'
7-
import {
8-
awaitClientToolResult,
9-
ToolHandler,
10-
} from '@latitude-data/core/lib/streamManager/clientTools/handlers'
7+
import { buildClientToolHandlersMap } from '@latitude-data/core/lib/streamManager/clientTools/handlers'
118
import { streamToGenerator } from '@latitude-data/core/lib/streamToGenerator'
129
import { addMessages } from '@latitude-data/core/services/documentLogs/addMessages/index'
1310
import { BACKGROUND, telemetry } from '@latitude-data/core/telemetry'
@@ -82,10 +79,3 @@ export const chatHandler: AppRouteHandler<ChatRoute> = async (c) => {
8279

8380
return c.json(body, 200)
8481
}
85-
86-
function buildClientToolHandlersMap(tools: string[]) {
87-
return tools.reduce((acc: Record<string, ToolHandler>, toolName: string) => {
88-
acc[toolName] = awaitClientToolResult
89-
return acc
90-
}, {})
91-
}

apps/gateway/src/routes/api/v3/projects/versions/documents/run/run.handler.ts

Lines changed: 99 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,14 @@ import { LogSources } from '@latitude-data/core/constants'
99
import { BadRequestError } from '@latitude-data/core/lib/errors'
1010
import { getUnknownError } from '@latitude-data/core/lib/getUnknownError'
1111
import { isAbortError } from '@latitude-data/core/lib/isAbortError'
12-
import {
13-
awaitClientToolResult,
14-
ToolHandler,
15-
} from '@latitude-data/core/lib/streamManager/clientTools/handlers'
12+
import { buildClientToolHandlersMap } from '@latitude-data/core/lib/streamManager/clientTools/handlers'
1613
import { streamToGenerator } from '@latitude-data/core/lib/streamToGenerator'
1714
import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit'
1815
import { enqueueRun } from '@latitude-data/core/services/runs/enqueue'
1916
import { isFeatureEnabledByName } from '@latitude-data/core/services/workspaceFeatures/isFeatureEnabledByName'
2017
import { BACKGROUND } from '@latitude-data/core/telemetry'
2118
import { streamSSE } from 'hono/streaming'
19+
import type { Context } from 'hono'
2220
import { RunRoute } from './run.route'
2321

2422
// https://github.com/honojs/middleware/issues/735
@@ -59,34 +57,111 @@ export const runHandler: AppRouteHandler<RunRoute> = async (c) => {
5957
}
6058

6159
if (background) {
62-
if (!runsEnabled) {
63-
throw new BadRequestError(
64-
'Background runs are not enabled for this workspace',
65-
)
66-
}
67-
68-
const { run } = await enqueueRun({
69-
document: document,
70-
commit: commit,
71-
project: project,
72-
workspace: workspace,
73-
parameters: parameters,
74-
customIdentifier: customIdentifier,
75-
tools: tools,
76-
userMessage: userMessage,
77-
source: source,
78-
}).then((r) => r.unwrap())
79-
80-
return c.json({ uuid: run.uuid })
60+
return await handleBackgroundRun({
61+
c,
62+
workspace,
63+
document,
64+
commit,
65+
project,
66+
parameters,
67+
customIdentifier,
68+
tools,
69+
userMessage,
70+
source,
71+
runsEnabled,
72+
})
8173
}
8274

83-
const result = await runDocumentAtCommit({
75+
return await handleForegroundRun({
76+
c,
8477
workspace,
8578
document,
8679
commit,
8780
parameters,
8881
customIdentifier,
8982
source: __internal?.source ?? LogSources.API,
83+
useSSE,
84+
tools,
85+
userMessage,
86+
})
87+
}
88+
89+
async function handleBackgroundRun({
90+
c,
91+
workspace,
92+
document,
93+
commit,
94+
project,
95+
parameters,
96+
customIdentifier,
97+
tools,
98+
userMessage,
99+
source,
100+
runsEnabled,
101+
}: {
102+
c: Context
103+
workspace: any
104+
document: any
105+
commit: any
106+
project: any
107+
parameters: any
108+
customIdentifier: any
109+
tools: any
110+
userMessage: any
111+
source: any
112+
runsEnabled: boolean
113+
}) {
114+
if (!runsEnabled) {
115+
throw new BadRequestError(
116+
'Background runs are not enabled for this workspace',
117+
)
118+
}
119+
120+
const { run } = await enqueueRun({
121+
document: document,
122+
commit: commit,
123+
project: project,
124+
workspace: workspace,
125+
parameters: parameters,
126+
customIdentifier: customIdentifier,
127+
tools: tools,
128+
userMessage: userMessage,
129+
source: source,
130+
}).then((r) => r.unwrap())
131+
132+
return c.json({ uuid: run.uuid })
133+
}
134+
135+
async function handleForegroundRun({
136+
c,
137+
workspace,
138+
document,
139+
commit,
140+
parameters,
141+
customIdentifier,
142+
source,
143+
useSSE,
144+
tools,
145+
userMessage,
146+
}: {
147+
c: Context
148+
workspace: any
149+
document: any
150+
commit: any
151+
parameters: any
152+
customIdentifier: any
153+
source: any
154+
useSSE: boolean
155+
tools: any
156+
userMessage: any
157+
}) {
158+
const result = await runDocumentAtCommit({
159+
workspace,
160+
document,
161+
commit,
162+
parameters,
163+
customIdentifier,
164+
source,
90165
abortSignal: c.req.raw.signal, // FIXME: This does not seem to work
91166
context: BACKGROUND({ workspaceId: workspace.id }),
92167
tools: useSSE ? buildClientToolHandlersMap(tools ?? []) : {},
@@ -152,10 +227,3 @@ export const runHandler: AppRouteHandler<RunRoute> = async (c) => {
152227

153228
return c.json(body)
154229
}
155-
156-
export function buildClientToolHandlersMap(tools: string[]) {
157-
return tools.reduce((acc: Record<string, ToolHandler>, toolName: string) => {
158-
acc[toolName] = awaitClientToolResult
159-
return acc
160-
}, {})
161-
}

apps/gateway/src/server.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ if (cluster.isPrimary) {
5353
hostname: HOSTNAME,
5454
port: Number(PORT),
5555
serverOptions: {
56-
keepAliveTimeout: process.env.KEEP_ALIVE_TIMEOUT
57-
? Number(process.env.KEEP_ALIVE_TIMEOUT)
58-
: 601000,
56+
keepAliveTimeout: env.KEEP_ALIVE_TIMEOUT,
5957
},
6058
},
6159
() => {

apps/web/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"@latitude-data/constants": "workspace:*",
2626
"@latitude-data/core": "workspace:*",
2727
"@latitude-data/env": "workspace:^",
28-
"@latitude-data/sdk": "5.2.0-beta.5",
28+
"@latitude-data/sdk": "workspace:^",
2929
"@latitude-data/socket.io-react-hook": "2.4.5",
3030
"@latitude-data/telemetry": "workspace:*",
3131
"@latitude-data/web-ui": "workspace:*",

apps/web/src/app/(actions)/actions/[actionType]/_lib/index.tsx

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,8 @@ export function ClientPage<T extends ActionType = ActionType>({
4141
setTimeout(async () => {
4242
try {
4343
const [result, error] = await executeBackendAction({ type, parameters })
44-
45-
if (error) {
46-
setError(error)
47-
} else if (result) {
48-
await executeFrontendAction({
49-
type,
50-
parameters: result as ActionFrontendParameters<T>,
51-
})
52-
}
44+
if (error) setError(error)
45+
if (result) await executeFrontendAction({ type, parameters: result })
5346
setTimeout(() => setEnded(true), 5000)
5447
} catch (error) {
5548
setError(error as Error)

apps/web/src/app/(private)/projects/[projectId]/versions/[commitUuid]/documents/[documentUuid]/(withTabs)/evaluations/[evaluationUuid]/editor/_components/EvaluationEditor/Playground/index.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ export const Playground = memo(
133133
runPromptFn={runPromptFn}
134134
abortCurrentStream={abortCurrentStream}
135135
hasActiveStream={hasActiveStream}
136+
isRunStream={false}
136137
/>
137138
)}
138139
</div>

0 commit comments

Comments
 (0)