Skip to content

Commit

Permalink
fix: streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiao committed Feb 22, 2025
1 parent 86c9ccb commit a85eb82
Showing 1 changed file with 10 additions and 27 deletions.
37 changes: 10 additions & 27 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,20 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
while (streamingState.queue.length > 0) {
const current = streamingState.queue[0];

// Clear any previous state
streamingState.remainingContent = ''; // Add this line

// Reset streaming state for new content
streamingState.currentlyStreaming = true;
streamingState.remainingContent = current.content;
streamingState.isEmitting = true;

try {
// Add a check to prevent duplicate streaming
if (streamingState.currentGenerator) {
streamingState.currentGenerator = null; // Add this line
}

for await (const word of streamTextNaturally(current.content, streamingState)) {
const chunk: ChatCompletionChunk = {
id: requestId,
Expand All @@ -429,27 +437,7 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);

// Small delay between words
await new Promise(resolve => setTimeout(resolve, 30));
}

// Add newline after content
const newlineChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created,
model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: '\n'},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(newlineChunk)}\n\n`);

} catch (error) {
console.error('Error in streaming:', error);
} finally {
Expand All @@ -459,15 +447,11 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
streamingState.remainingContent = '';
streamingState.queue.shift();
current.resolve();

// Small delay between queue items
await new Promise(resolve => setTimeout(resolve, 50));
}
}

streamingState.processingQueue = false;
}

app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
// Check authentication only if secret is set
if (secret) {
Expand Down Expand Up @@ -551,10 +535,9 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const actionListener = async (step: StepAction) => {
// Add content to queue for both thinking steps and final answer
if (step.think) {
const content = step.think;
// if not ends with a space, add one
const content = step.think + ' ';
await new Promise<void>(resolve => {
streamingState.currentlyStreaming = false;

streamingState.queue.push({
content,
resolve
Expand Down

0 comments on commit a85eb82

Please sign in to comment.