Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"extends": "next/core-web-vitals"
}
100 changes: 100 additions & 0 deletions components/StreamingMessage.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* StreamingMessage — renders a single AI assistant message that arrives via
* streaming.
*
* Props:
* - lines List of output strings received from the stream
* - isStreaming Whether more tokens are expected
* - error Non-null string when the stream errored
*
* The component renders each line in order with a blinking cursor appended
* while `isStreaming` is true, so the user can see tokens arrive in real-time.
*/

import React from 'react';

// ─── Types ────────────────────────────────────────────────────────────────────

export interface StreamingMessageProps {
/** Output lines received from the SSE stream */
lines: string[];
/** Whether the stream is still open */
isStreaming: boolean;
/** Error message — if provided, shows an error badge below the content */
error?: string | null;
/** Optional CSS class applied to the wrapper div */
className?: string;
}

// ─── Component ────────────────────────────────────────────────────────────────

/**
* StreamingMessage
*
* @example
* ```tsx
* const { lines, isStreaming, error } = useStream();
*
* return (
* <StreamingMessage
* lines={lines}
* isStreaming={isStreaming}
* error={error}
* />
* );
* ```
*/
export function StreamingMessage({
lines,
isStreaming,
error = null,
className = '',
}: StreamingMessageProps): React.ReactElement {
return (
<div
className={`streaming-message ${className}`.trim()}
data-testid="streaming-message"
aria-live="polite"
aria-atomic="false"
>
{/* Render each streamed line as its own paragraph */}
<div
className="streaming-message__content"
data-testid="streaming-content"
>
{lines.map((line, index) => (
<p key={index} className="streaming-message__line">
{line}
</p>
))}

{/* Blinking cursor shown only while streaming */}
{isStreaming && (
<span
className="streaming-message__cursor"
data-testid="streaming-cursor"
aria-hidden="true"
>
</span>
)}
</div>

{/* Error badge shown when the stream errors */}
{error && (
<div
className="streaming-message__error"
data-testid="streaming-error"
role="alert"
>
<span className="streaming-message__error-icon" aria-hidden="true">
</span>
<span className="streaming-message__error-text">{error}</span>
</div>
)}
</div>
);
}

export default StreamingMessage;
222 changes: 222 additions & 0 deletions hooks/useStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/**
* useStream — Custom hook for consuming Server-Sent Events (SSE) from the
* Calliope backend agent endpoint.
*
* The backend (server/agent.py) already emits `text/event-stream` frames in
* the shape:
* data: {"type": "output", "data": "<line>"}
* data: {"type": "input_required"}
*
* This hook opens the stream, parses each frame, and exposes:
* - lines — accumulated output lines received so far
* - isStreaming — true while the stream is open
* - error — non-null if the stream errored or the server returned a
* non-2xx status
* - startStream — call this with (url, body?) to begin a new stream
* - stopStream — abort the current stream
* - reset — clear all state back to initial values
*/

import { useState, useCallback, useRef, useEffect } from 'react';

// ─── Types ────────────────────────────────────────────────────────────────────

export type SseFrameType = 'output' | 'input_required' | 'error';

export interface SseFrame {
type: SseFrameType;
data?: string;
message?: string;
}

export interface StreamState {
/** All output lines received from the stream so far */
lines: string[];
/** Whether the stream is currently open and receiving data */
isStreaming: boolean;
/** Whether the backend has requested interactive user input */
inputRequired: boolean;
/** Error message, null when there is no error */
error: string | null;
}

export interface UseStreamReturn extends StreamState {
/** Open a new SSE stream to `url`, optionally with a POST `body` */
startStream: (url: string, body?: Record<string, unknown>) => void;
/** Abort the running stream */
stopStream: () => void;
/** Reset state back to initial values */
reset: () => void;
}

// ─── Initial state ────────────────────────────────────────────────────────────

const INITIAL_STATE: StreamState = {
lines: [],
isStreaming: false,
inputRequired: false,
error: null,
};

// ─── Hook ─────────────────────────────────────────────────────────────────────

/**
* useStream
*
* @example
* ```tsx
* const { lines, isStreaming, error, startStream, stopStream } = useStream();
*
* const handleSend = () => {
* startStream(`http://localhost:${port}/?data=${encodeURIComponent(query)}`);
* };
* ```
*/
export function useStream(): UseStreamReturn {
const [state, setState] = useState<StreamState>(INITIAL_STATE);

// Keep a ref to the AbortController so we can cancel from stopStream/unmount
const abortRef = useRef<AbortController | null>(null);

// Cleanup on unmount
useEffect(() => {
return () => {
abortRef.current?.abort();
};
}, []);

const reset = useCallback(() => {
abortRef.current?.abort();
abortRef.current = null;
setState(INITIAL_STATE);
}, []);

const stopStream = useCallback(() => {
abortRef.current?.abort();
abortRef.current = null;
setState((prev) => ({ ...prev, isStreaming: false }));
}, []);

const startStream = useCallback(
(url: string, body?: Record<string, unknown>) => {
// Abort any previous stream first
abortRef.current?.abort();
const controller = new AbortController();
abortRef.current = controller;

// Reset to fresh streaming state
setState({ lines: [], isStreaming: true, inputRequired: false, error: null });

const run = async () => {
try {
const fetchOptions: RequestInit = {
signal: controller.signal,
};

if (body !== undefined) {
fetchOptions.method = 'POST';
fetchOptions.headers = { 'Content-Type': 'application/json' };
fetchOptions.body = JSON.stringify(body);
} else {
fetchOptions.method = 'GET';
}

const response = await fetch(url, fetchOptions);

if (!response.ok) {
const text = await response.text().catch(() => '');
throw new Error(
`Server responded with ${response.status}: ${text || response.statusText}`
);
}

if (!response.body) {
throw new Error('Response body is null — streaming not supported');
}

const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';

while (true) {
const { done, value } = await reader.read();

if (done) break;

buffer += decoder.decode(value, { stream: true });

// SSE frames are separated by double newlines
const parts = buffer.split('\n\n');
// The last element may be a partial frame — keep it in the buffer
buffer = parts.pop() ?? '';

for (const part of parts) {
const trimmed = part.trim();
if (!trimmed) continue;

// Extract the "data: ..." line
const dataLine = trimmed
.split('\n')
.find((l) => l.startsWith('data:'));

if (!dataLine) continue;

const jsonStr = dataLine.slice('data:'.length).trim();

let frame: SseFrame;
try {
frame = JSON.parse(jsonStr) as SseFrame;
} catch {
// Malformed JSON — skip
continue;
}

if (frame.type === 'output' && frame.data !== undefined) {
setState((prev) => ({
...prev,
lines: [...prev.lines, frame.data as string],
}));
} else if (frame.type === 'input_required') {
setState((prev) => ({
...prev,
inputRequired: true,
isStreaming: false,
}));
return; // Pause; caller should handle user input
} else if (frame.type === 'error') {
throw new Error(frame.message ?? 'Unknown streaming error');
}
}
}

// Stream finished cleanly
setState((prev) => ({ ...prev, isStreaming: false }));
} catch (err) {
if ((err as { name?: string }).name === 'AbortError') {
// User intentionally aborted — not an error
setState((prev) => ({ ...prev, isStreaming: false }));
return;
}

const message =
err instanceof Error ? err.message : 'Unexpected streaming error';
setState((prev) => ({
...prev,
isStreaming: false,
error: message,
}));
}
};

run();
},
[]
);

return {
...state,
startStream,
stopStream,
reset,
};
}
Loading
Loading