Skip to content

Commit e47b3f9

Browse files
authored
[ResponsesAPI] Implement streaming mode (#1582)
Built on top of #1576. This PR adds support for streaming mode to the Responses API. Tested it using the [openai-responses-starter-app](https://github.com/openai/openai-responses-starter-app): [Screencast from 02-07-2025 07:43:52.webm](https://github.com/user-attachments/assets/6eb77c9c-5796-4841-af55-f526da8da847) ``` pnpm run example streaming ``` ```js { type: 'response.created', response: { object: 'response', id: 'resp_861131785bfb75f24f944aa7cbc4767b194a2ea320cff258', status: 'in_progress', error: null, instructions: null, model: 'Qwen/Qwen2.5-VL-7B-Instruct', temperature: 1, top_p: 1, created_at: 1751383702199, output: [] }, sequence_number: 0 } { type: 'response.in_progress', response: { object: 'response', id: 'resp_861131785bfb75f24f944aa7cbc4767b194a2ea320cff258', status: 'in_progress', error: null, instructions: null, model: 'Qwen/Qwen2.5-VL-7B-Instruct', temperature: 1, top_p: 1, created_at: 1751383702199, output: [] }, sequence_number: 1 } { type: 'response.output_item.added', output_index: 0, item: { id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', type: 'message', role: 'assistant', status: 'in_progress', content: [] }, sequence_number: 2 } { type: 'response.content_part.added', item_id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', output_index: 0, content_index: 0, part: { type: 'output_text', text: '', annotations: [] }, sequence_number: 3 } { type: 'response.output_text.delta', item_id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', output_index: 0, content_index: 0, delta: 'Double', sequence_number: 4 } { type: 'response.output_text.delta', item_id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', output_index: 0, content_index: 0, delta: ' bubble', sequence_number: 5 } ... { type: 'response.output_text.delta', item_id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', output_index: 0, content_index: 0, delta: '!', sequence_number: 43 } { type: 'response.output_text.done', item_id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', output_index: 0, content_index: 0, text: 'Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath!', sequence_number: 44 } { type: 'response.content_part.done', item_id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', output_index: 0, content_index: 0, part: { type: 'output_text', text: 'Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath! Double bubble bath!', annotations: [] }, sequence_number: 45 } { type: 'response.output_item.done', output_index: 0, item: { id: 'msg_def4b731a2654f7eab4fb2efdff217079da37154709c0f0b', type: 'message', role: 'assistant', status: 'completed', content: [ [Object] ] }, sequence_number: 46 } { type: 'response.completed', response: { object: 'response', id: 'resp_861131785bfb75f24f944aa7cbc4767b194a2ea320cff258', status: 'completed', error: null, instructions: null, model: 'Qwen/Qwen2.5-VL-7B-Instruct', temperature: 1, top_p: 1, created_at: 1751383702199, output: [ [Object] ] }, sequence_number: 47 } ```
1 parent 2090cf7 commit e47b3f9

File tree

3 files changed

+278
-81
lines changed

3 files changed

+278
-81
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { OpenAI } from "openai";
2+
const openai = new OpenAI({ baseURL: "http://localhost:3000/v1", apiKey: process.env.HF_TOKEN });
3+
4+
const stream = await openai.responses.create({
5+
model: "Qwen/Qwen2.5-VL-7B-Instruct",
6+
input: [
7+
{
8+
role: "user",
9+
content: "Say 'double bubble bath' ten times fast.",
10+
},
11+
],
12+
stream: true,
13+
});
14+
15+
for await (const event of stream) {
16+
console.log(event);
17+
}

packages/responses-server/src/routes/responses.ts

Lines changed: 201 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ import { generateUniqueId } from "../lib/generateUniqueId.js";
55
import { InferenceClient } from "@huggingface/inference";
66
import type { ChatCompletionInputMessage, ChatCompletionInputMessageChunkType } from "@huggingface/tasks";
77

8-
import { type Response as OpenAIResponse } from "openai/resources/responses/responses";
8+
import type {
9+
Response,
10+
ResponseStreamEvent,
11+
ResponseOutputItem,
12+
ResponseContentPartAddedEvent,
13+
} from "openai/resources/responses/responses";
914

1015
export const postCreateResponse = async (
1116
req: ValidatedRequest<CreateResponseParams>,
@@ -33,27 +38,189 @@ export const postCreateResponse = async (
3338
content:
3439
typeof item.content === "string"
3540
? item.content
36-
: item.content.map((content) => {
37-
if (content.type === "input_image") {
38-
return {
39-
type: "image_url" as ChatCompletionInputMessageChunkType,
40-
image_url: {
41-
url: content.image_url,
42-
},
43-
};
44-
}
45-
// content.type must be "input_text" at this point
46-
return {
47-
type: "text" as ChatCompletionInputMessageChunkType,
48-
text: content.text,
49-
};
50-
}),
41+
: item.content
42+
.map((content) => {
43+
switch (content.type) {
44+
case "input_image":
45+
return {
46+
type: "image_url" as ChatCompletionInputMessageChunkType,
47+
image_url: {
48+
url: content.image_url,
49+
},
50+
};
51+
case "output_text":
52+
return {
53+
type: "text" as ChatCompletionInputMessageChunkType,
54+
text: content.text,
55+
};
56+
case "refusal":
57+
return undefined;
58+
case "input_text":
59+
return {
60+
type: "text" as ChatCompletionInputMessageChunkType,
61+
text: content.text,
62+
};
63+
}
64+
})
65+
.filter((item) => item !== undefined),
5166
}))
5267
);
5368
} else {
5469
messages.push({ role: "user", content: req.body.input });
5570
}
5671

72+
const payload = {
73+
model: req.body.model,
74+
messages: messages,
75+
temperature: req.body.temperature,
76+
top_p: req.body.top_p,
77+
stream: req.body.stream,
78+
};
79+
80+
const responseObject: Omit<
81+
Response,
82+
"incomplete_details" | "metadata" | "output_text" | "parallel_tool_calls" | "tool_choice" | "tools"
83+
> = {
84+
object: "response",
85+
id: generateUniqueId("resp"),
86+
status: "in_progress",
87+
error: null,
88+
instructions: req.body.instructions,
89+
model: req.body.model,
90+
temperature: req.body.temperature,
91+
top_p: req.body.top_p,
92+
created_at: new Date().getTime(),
93+
output: [],
94+
};
95+
96+
if (req.body.stream) {
97+
res.setHeader("Content-Type", "text/event-stream");
98+
res.setHeader("Connection", "keep-alive");
99+
let sequenceNumber = 0;
100+
101+
// Emit events in sequence
102+
const emitEvent = (event: ResponseStreamEvent) => {
103+
res.write(`data: ${JSON.stringify(event)}\n\n`);
104+
};
105+
106+
try {
107+
// Response created event
108+
emitEvent({
109+
type: "response.created",
110+
response: responseObject as Response,
111+
sequence_number: sequenceNumber++,
112+
});
113+
114+
// Response in progress event
115+
emitEvent({
116+
type: "response.in_progress",
117+
response: responseObject as Response,
118+
sequence_number: sequenceNumber++,
119+
});
120+
121+
const stream = client.chatCompletionStream(payload);
122+
123+
const outputObject: ResponseOutputItem = {
124+
id: generateUniqueId("msg"),
125+
type: "message",
126+
role: "assistant",
127+
status: "in_progress",
128+
content: [],
129+
};
130+
responseObject.output = [outputObject];
131+
132+
// Response output item added event
133+
emitEvent({
134+
type: "response.output_item.added",
135+
output_index: 0,
136+
item: outputObject,
137+
sequence_number: sequenceNumber++,
138+
});
139+
140+
// Response content part added event
141+
const contentPart: ResponseContentPartAddedEvent["part"] = {
142+
type: "output_text",
143+
text: "",
144+
annotations: [],
145+
};
146+
outputObject.content.push(contentPart);
147+
148+
emitEvent({
149+
type: "response.content_part.added",
150+
item_id: outputObject.id,
151+
output_index: 0,
152+
content_index: 0,
153+
part: contentPart,
154+
sequence_number: sequenceNumber++,
155+
});
156+
157+
for await (const chunk of stream) {
158+
if (chunk.choices[0].delta.content) {
159+
contentPart.text += chunk.choices[0].delta.content;
160+
161+
// Response output text delta event
162+
emitEvent({
163+
type: "response.output_text.delta",
164+
item_id: outputObject.id,
165+
output_index: 0,
166+
content_index: 0,
167+
delta: chunk.choices[0].delta.content,
168+
sequence_number: sequenceNumber++,
169+
});
170+
}
171+
}
172+
173+
// Response output text done event
174+
emitEvent({
175+
type: "response.output_text.done",
176+
item_id: outputObject.id,
177+
output_index: 0,
178+
content_index: 0,
179+
text: contentPart.text,
180+
sequence_number: sequenceNumber++,
181+
});
182+
183+
// Response content part done event
184+
emitEvent({
185+
type: "response.content_part.done",
186+
item_id: outputObject.id,
187+
output_index: 0,
188+
content_index: 0,
189+
part: contentPart,
190+
sequence_number: sequenceNumber++,
191+
});
192+
193+
// Response output item done event
194+
outputObject.status = "completed";
195+
emitEvent({
196+
type: "response.output_item.done",
197+
output_index: 0,
198+
item: outputObject,
199+
sequence_number: sequenceNumber++,
200+
});
201+
202+
// Response completed event
203+
responseObject.status = "completed";
204+
emitEvent({
205+
type: "response.completed",
206+
response: responseObject as Response,
207+
sequence_number: sequenceNumber++,
208+
});
209+
} catch (streamError: any) {
210+
console.error("Error in streaming chat completion:", streamError);
211+
212+
emitEvent({
213+
type: "error",
214+
code: null,
215+
message: streamError.message || "An error occurred while streaming from inference server.",
216+
param: null,
217+
sequence_number: sequenceNumber++,
218+
});
219+
}
220+
res.end();
221+
return;
222+
}
223+
57224
try {
58225
const chatCompletionResponse = await client.chatCompletion({
59226
model: req.body.model,
@@ -62,37 +229,24 @@ export const postCreateResponse = async (
62229
top_p: req.body.top_p,
63230
});
64231

65-
const responseObject: Omit<
66-
OpenAIResponse,
67-
"incomplete_details" | "metadata" | "output_text" | "parallel_tool_calls" | "tool_choice" | "tools"
68-
> = {
69-
object: "response",
70-
id: generateUniqueId("resp"),
71-
status: "completed",
72-
error: null,
73-
instructions: req.body.instructions,
74-
model: req.body.model,
75-
temperature: req.body.temperature,
76-
top_p: req.body.top_p,
77-
created_at: chatCompletionResponse.created,
78-
output: chatCompletionResponse.choices[0].message.content
79-
? [
80-
{
81-
id: generateUniqueId("msg"),
82-
type: "message",
83-
role: "assistant",
84-
status: "completed",
85-
content: [
86-
{
87-
type: "output_text",
88-
text: chatCompletionResponse.choices[0].message.content,
89-
annotations: [],
90-
},
91-
],
92-
},
93-
]
94-
: [],
95-
};
232+
responseObject.status = "completed";
233+
responseObject.output = chatCompletionResponse.choices[0].message.content
234+
? [
235+
{
236+
id: generateUniqueId("msg"),
237+
type: "message",
238+
role: "assistant",
239+
status: "completed",
240+
content: [
241+
{
242+
type: "output_text",
243+
text: chatCompletionResponse.choices[0].message.content,
244+
annotations: [],
245+
},
246+
],
247+
},
248+
]
249+
: [];
96250

97251
res.json(responseObject);
98252
} catch (error) {

0 commit comments

Comments
 (0)