Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions test/req_llm/stream_response_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ defmodule ReqLLM.StreamResponseTest do
alias ReqLLM.{Context, Response, StreamChunk, StreamResponse, ToolCall}

describe "struct validation and defaults" do
test "exposes schema metadata" do
refute is_nil(StreamResponse.schema())
end

test "creates stream response with required fields" do
context = Context.new([Context.system("Test")])
model = %LLMDB.Model{provider: :test, id: "test-model"}
Expand Down Expand Up @@ -269,13 +273,27 @@ defmodule ReqLLM.StreamResponseTest do
# Table-driven tests for finish reason scenarios
finish_reason_tests = [
{:stop, :stop},
{:completed, :stop},
{:length, :length},
{:tool_use, :tool_calls},
{:tool_calls, :tool_calls},
{:end_turn, :stop},
{:max_tokens, :length},
{:max_output_tokens, :length},
{:content_filter, :content_filter},
{:error, :error},
{:cancelled, :cancelled},
{:incomplete, :incomplete},
{"stop", :stop},
{"completed", :stop},
{"length", :length},
{"tool_calls", :tool_calls},
{"tool_use", :tool_calls},
{"max_tokens", :length},
{"max_output_tokens", :length},
{"content_filter", :content_filter},
{"end_turn", :stop},
{"error", :error},
{"cancelled", :cancelled},
{"incomplete", :incomplete},
{"not_real_reason", :unknown}
Expand Down Expand Up @@ -313,6 +331,37 @@ defmodule ReqLLM.StreamResponseTest do
end
end

describe "tool call helpers" do
test "filters tool call chunks from the stream" do
chunks = [
StreamChunk.text("hello"),
StreamChunk.tool_call("search", %{query: "weather"})
]

stream_response = create_stream_response(stream: chunks)

assert [
%ReqLLM.StreamChunk{
type: :tool_call,
name: "search",
arguments: %{query: "weather"}
}
] = StreamResponse.tool_calls(stream_response) |> Enum.to_list()
end

test "extracts summarized tool calls" do
chunks = [
StreamChunk.tool_call("search", %{query: "weather"}, %{id: "call_123"}),
StreamChunk.meta(%{finish_reason: :tool_use})
]

stream_response = create_stream_response(stream: chunks)

assert [%{id: "call_123", name: "search", arguments: %{query: "weather"}}] =
StreamResponse.extract_tool_calls(stream_response)
end
end

describe "to_response/1 backward compatibility" do
test "converts simple streaming response to legacy Response" do
chunks = text_chunks(["Hello", " world!"])
Expand Down
288 changes: 285 additions & 3 deletions test/req_llm/streaming/finch_client_test.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,74 @@
defmodule ReqLLM.Streaming.FinchClientTest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false

alias ReqLLM.Context
alias ReqLLM.Streaming.FinchClient
alias ReqLLM.Streaming.Fixtures.HTTPContext

defmodule StreamingRouter do
use Plug.Router

plug(:match)
plug(:dispatch)

post "/stream" do
conn =
conn
|> Plug.Conn.put_resp_content_type("text/event-stream")
|> Plug.Conn.send_chunked(200)

{:ok, conn} =
Plug.Conn.chunk(conn, "data: {\"choices\": [{\"delta\": {\"content\": \"hello\"}}]}\n\n")

{:ok, conn} = Plug.Conn.chunk(conn, "data: [DONE]\n\n")
conn
end
end

setup do
adapter_config = Application.get_env(:req_llm, :finch_request_adapter)
finch_config = Application.get_env(:req_llm, :finch)
fixtures_mode = System.get_env("REQ_LLM_FIXTURES_MODE")
openai_api_key = System.get_env("OPENAI_API_KEY")

System.put_env("REQ_LLM_FIXTURES_MODE", "replay")
System.put_env("OPENAI_API_KEY", "test-streaming-key")

on_exit(fn ->
restore_app_env(:finch_request_adapter, adapter_config)
restore_app_env(:finch, finch_config)
restore_system_env("REQ_LLM_FIXTURES_MODE", fixtures_mode)
restore_system_env("OPENAI_API_KEY", openai_api_key)
end)

:ok
end

defp restore_app_env(key, nil), do: Application.delete_env(:req_llm, key)
defp restore_app_env(key, value), do: Application.put_env(:req_llm, key, value)
defp restore_system_env(key, nil), do: System.delete_env(key)
defp restore_system_env(key, value), do: System.put_env(key, value)

defp reserve_port do
{:ok, socket} = :gen_tcp.listen(0, [:binary, active: false, reuseaddr: true])
{:ok, port} = :inet.port(socket)
:ok = :gen_tcp.close(socket)
port
end

defp wait_until(fun, attempts \\ 20)

defp wait_until(fun, attempts) when attempts > 0 do
if fun.() do
true
else
Process.sleep(25)
wait_until(fun, attempts - 1)
end
end

defp wait_until(_fun, 0), do: false

describe "HTTPContext" do
test "creates new context with basic info" do
headers = %{
Expand Down Expand Up @@ -129,6 +193,61 @@ defmodule ReqLLM.Streaming.FinchClientTest do
end
end

defmodule EventStreamServer do
use GenServer

def start_link do
GenServer.start_link(__MODULE__, [])
end

def events(pid) do
GenServer.call(pid, :events)
end

def init(_), do: {:ok, []}

def handle_call(:events, _from, state) do
{:reply, Enum.reverse(state), state}
end

def handle_call({:http_event, event}, _from, state) do
{:reply, :ok, [event | state]}
end
end

defmodule ErrorProvider do
def attach_stream(_model, _context, _opts, _finch_name), do: {:error, :boom}
end

defmodule LargeBodyProvider do
def attach_stream(_model, _context, _opts, _finch_name) do
{:ok,
Finch.build(
:post,
"https://example.com/stream",
[{"content-type", "application/json"}],
String.duplicate("x", 70_000)
)}
end
end

defmodule LiveStreamProvider do
def attach_stream(_model, _context, opts, _finch_name) do
body =
opts
|> Keyword.get(:request_body, %{"thinking" => %{"type" => "enabled"}})
|> Jason.encode!()

{:ok,
Finch.build(
:post,
Keyword.fetch!(opts, :stream_url),
[{"content-type", "application/json"}],
body
)}
end
end

test "returns error when provider module doesn't exist" do
{:ok, stream_server} = MockStreamServer.start_link()
{:ok, context} = Context.normalize("Test")
Expand Down Expand Up @@ -164,10 +283,173 @@ defmodule ReqLLM.Streaming.FinchClientTest do
assert is_pid(task_pid)
assert %HTTPContext{} = http_context
assert is_map(canonical_json)
assert http_context.url == "https://api.openai.com/v1/chat/completions"
assert String.starts_with?(http_context.url, "https://")
assert String.ends_with?(http_context.url, "/chat/completions")
assert http_context.method == :post
assert is_map(http_context.req_headers)
end

test "returns provider_build_failed when provider attach_stream returns an error" do
{:ok, stream_server} = MockStreamServer.start_link()
{:ok, context} = Context.normalize("Test")

assert {:error, {:provider_build_failed, :boom}} =
FinchClient.start_stream(
ErrorProvider,
%LLMDB.Model{provider: :test, id: "test"},
context,
[],
stream_server
)
end

test "replays streaming fixtures without building a live request" do
{:ok, stream_server} = EventStreamServer.start_link()
{:ok, model} = ReqLLM.model("openrouter:google/gemini-3-flash-preview")
{:ok, context} = Context.normalize("Hello")

assert {:ok, task_pid, http_context, canonical_json} =
FinchClient.start_stream(
ReqLLM.Providers.OpenRouter,
model,
context,
[fixture: "streaming"],
stream_server
)

assert is_pid(task_pid)
assert http_context.url =~ "fixture://"
assert http_context.status == 200
assert canonical_json["model"] == "google/gemini-3-flash-preview"

Process.sleep(50)

assert Enum.any?(EventStreamServer.events(stream_server), &match?({:status, 200}, &1))
end

test "allows large request bodies when finch pool config is missing" do
Application.put_env(:req_llm, :finch, [])

{:ok, stream_server} = MockStreamServer.start_link()
{:ok, context} = Context.normalize("Test")

assert {:ok, task_pid, http_context, canonical_json} =
FinchClient.start_stream(
LargeBodyProvider,
%LLMDB.Model{provider: :test, id: "test"},
context,
[receive_timeout: 10, max_retries: 0],
stream_server
)

assert is_pid(task_pid)
assert %HTTPContext{} = http_context
assert canonical_json[:raw_body] != nil
end

test "allows large request bodies when finch config cannot be parsed" do
Application.put_env(:req_llm, :finch, :invalid_config)

{:ok, stream_server} = MockStreamServer.start_link()
{:ok, context} = Context.normalize("Test")

assert {:ok, task_pid, _http_context, _canonical_json} =
FinchClient.start_stream(
LargeBodyProvider,
%LLMDB.Model{provider: :test, id: "test"},
context,
[receive_timeout: 10, max_retries: 0],
stream_server
)

assert is_pid(task_pid)
end

test "forwards successful HTTP streaming events through the stream server" do
port = reserve_port()
start_supervised!({Bandit, plug: StreamingRouter, port: port})

{:ok, stream_server} = EventStreamServer.start_link()
{:ok, context} = Context.normalize("Test")
model = %LLMDB.Model{provider: :test, id: "test"}
stream_url = "http://127.0.0.1:#{port}/stream"

assert {:ok, task_pid, http_context, canonical_json} =
FinchClient.start_stream(
LiveStreamProvider,
model,
context,
[stream_url: stream_url],
stream_server
)

assert is_pid(task_pid)
assert %HTTPContext{} = http_context
assert canonical_json["thinking"]["type"] == "enabled"

assert wait_until(fn ->
events = EventStreamServer.events(stream_server)

Enum.any?(events, &match?({:status, 200}, &1)) and
Enum.any?(events, &match?({:data, _}, &1))
end)

monitor_ref = Process.monitor(task_pid)
assert_receive {:DOWN, ^monitor_ref, :process, ^task_pid, _}, 1_000

events = EventStreamServer.events(stream_server)

assert Enum.any?(events, &match?({:status, 200}, &1))
assert Enum.any?(events, &match?({:headers, _}, &1))
assert Enum.any?(events, &match?({:data, _}, &1))
end

test "forwards retry stream errors through the stream server" do
{:ok, stream_server} = EventStreamServer.start_link()
{:ok, context} = Context.normalize("Test")
model = %LLMDB.Model{provider: :test, id: "test"}

assert {:ok, task_pid, _http_context, _canonical_json} =
FinchClient.start_stream(
LiveStreamProvider,
model,
context,
[stream_url: "http://127.0.0.1:1/stream", max_retries: 0, receive_timeout: 10],
stream_server
)

assert is_pid(task_pid)

assert wait_until(fn ->
Enum.any?(EventStreamServer.events(stream_server), &match?({:error, _}, &1))
end)
end

test "captures finch process exits as stream server errors" do
port = reserve_port()
start_supervised!({Bandit, plug: StreamingRouter, port: port})

{:ok, stream_server} = EventStreamServer.start_link()
{:ok, context} = Context.normalize("Test")
model = %LLMDB.Model{provider: :test, id: "test"}
stream_url = "http://127.0.0.1:#{port}/stream"

assert {:ok, task_pid, _http_context, _canonical_json} =
FinchClient.start_stream(
LiveStreamProvider,
model,
context,
[stream_url: stream_url, receive_timeout: 10],
stream_server,
ReqLLM.MissingFinch
)

assert is_pid(task_pid)

assert wait_until(fn ->
Enum.any?(EventStreamServer.events(stream_server), &match?({:error, _}, &1))
end)
end
end

describe "provider URL and endpoint mapping" do
Expand Down Expand Up @@ -328,7 +610,7 @@ defmodule ReqLLM.Streaming.FinchClientTest do

receive do
{:DOWN, ^ref, :process, ^task_pid, reason} ->
assert reason == :killed
assert reason in [:killed, :noproc]
after
1000 -> flunk("Task did not terminate")
end
Expand Down
Loading
Loading