diff --git a/test/req_llm/stream_response_test.exs b/test/req_llm/stream_response_test.exs index e9416582..74477949 100644 --- a/test/req_llm/stream_response_test.exs +++ b/test/req_llm/stream_response_test.exs @@ -71,6 +71,10 @@ defmodule ReqLLM.StreamResponseTest do alias ReqLLM.{Context, Response, StreamChunk, StreamResponse, ToolCall} describe "struct validation and defaults" do + test "exposes schema metadata" do + refute is_nil(StreamResponse.schema()) + end + test "creates stream response with required fields" do context = Context.new([Context.system("Test")]) model = %LLMDB.Model{provider: :test, id: "test-model"} @@ -269,13 +273,27 @@ defmodule ReqLLM.StreamResponseTest do # Table-driven tests for finish reason scenarios finish_reason_tests = [ {:stop, :stop}, + {:completed, :stop}, {:length, :length}, {:tool_use, :tool_calls}, + {:tool_calls, :tool_calls}, + {:end_turn, :stop}, + {:max_tokens, :length}, + {:max_output_tokens, :length}, + {:content_filter, :content_filter}, + {:error, :error}, {:cancelled, :cancelled}, {:incomplete, :incomplete}, {"stop", :stop}, + {"completed", :stop}, {"length", :length}, + {"tool_calls", :tool_calls}, {"tool_use", :tool_calls}, + {"max_tokens", :length}, + {"max_output_tokens", :length}, + {"content_filter", :content_filter}, + {"end_turn", :stop}, + {"error", :error}, {"cancelled", :cancelled}, {"incomplete", :incomplete}, {"not_real_reason", :unknown} @@ -313,6 +331,37 @@ defmodule ReqLLM.StreamResponseTest do end end + describe "tool call helpers" do + test "filters tool call chunks from the stream" do + chunks = [ + StreamChunk.text("hello"), + StreamChunk.tool_call("search", %{query: "weather"}) + ] + + stream_response = create_stream_response(stream: chunks) + + assert [ + %ReqLLM.StreamChunk{ + type: :tool_call, + name: "search", + arguments: %{query: "weather"} + } + ] = StreamResponse.tool_calls(stream_response) |> Enum.to_list() + end + + test "extracts summarized tool calls" do + chunks = [ + StreamChunk.tool_call("search", %{query: "weather"}, %{id: "call_123"}), + StreamChunk.meta(%{finish_reason: :tool_use}) + ] + + stream_response = create_stream_response(stream: chunks) + + assert [%{id: "call_123", name: "search", arguments: %{query: "weather"}}] = + StreamResponse.extract_tool_calls(stream_response) + end + end + describe "to_response/1 backward compatibility" do test "converts simple streaming response to legacy Response" do chunks = text_chunks(["Hello", " world!"]) diff --git a/test/req_llm/streaming/finch_client_test.exs b/test/req_llm/streaming/finch_client_test.exs index 4bc5e5c0..b661b8d0 100644 --- a/test/req_llm/streaming/finch_client_test.exs +++ b/test/req_llm/streaming/finch_client_test.exs @@ -1,10 +1,74 @@ defmodule ReqLLM.Streaming.FinchClientTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false alias ReqLLM.Context alias ReqLLM.Streaming.FinchClient alias ReqLLM.Streaming.Fixtures.HTTPContext + defmodule StreamingRouter do + use Plug.Router + + plug(:match) + plug(:dispatch) + + post "/stream" do + conn = + conn + |> Plug.Conn.put_resp_content_type("text/event-stream") + |> Plug.Conn.send_chunked(200) + + {:ok, conn} = + Plug.Conn.chunk(conn, "data: {\"choices\": [{\"delta\": {\"content\": \"hello\"}}]}\n\n") + + {:ok, conn} = Plug.Conn.chunk(conn, "data: [DONE]\n\n") + conn + end + end + + setup do + adapter_config = Application.get_env(:req_llm, :finch_request_adapter) + finch_config = Application.get_env(:req_llm, :finch) + fixtures_mode = System.get_env("REQ_LLM_FIXTURES_MODE") + openai_api_key = System.get_env("OPENAI_API_KEY") + + System.put_env("REQ_LLM_FIXTURES_MODE", "replay") + System.put_env("OPENAI_API_KEY", "test-streaming-key") + + on_exit(fn -> + restore_app_env(:finch_request_adapter, adapter_config) + restore_app_env(:finch, finch_config) + restore_system_env("REQ_LLM_FIXTURES_MODE", fixtures_mode) + restore_system_env("OPENAI_API_KEY", openai_api_key) + end) + + :ok + end + + defp restore_app_env(key, nil), do: Application.delete_env(:req_llm, key) + defp restore_app_env(key, value), do: Application.put_env(:req_llm, key, value) + defp restore_system_env(key, nil), do: System.delete_env(key) + defp restore_system_env(key, value), do: System.put_env(key, value) + + defp reserve_port do + {:ok, socket} = :gen_tcp.listen(0, [:binary, active: false, reuseaddr: true]) + {:ok, port} = :inet.port(socket) + :ok = :gen_tcp.close(socket) + port + end + + defp wait_until(fun, attempts \\ 20) + + defp wait_until(fun, attempts) when attempts > 0 do + if fun.() do + true + else + Process.sleep(25) + wait_until(fun, attempts - 1) + end + end + + defp wait_until(_fun, 0), do: false + describe "HTTPContext" do test "creates new context with basic info" do headers = %{ @@ -129,6 +193,61 @@ defmodule ReqLLM.Streaming.FinchClientTest do end end + defmodule EventStreamServer do + use GenServer + + def start_link do + GenServer.start_link(__MODULE__, []) + end + + def events(pid) do + GenServer.call(pid, :events) + end + + def init(_), do: {:ok, []} + + def handle_call(:events, _from, state) do + {:reply, Enum.reverse(state), state} + end + + def handle_call({:http_event, event}, _from, state) do + {:reply, :ok, [event | state]} + end + end + + defmodule ErrorProvider do + def attach_stream(_model, _context, _opts, _finch_name), do: {:error, :boom} + end + + defmodule LargeBodyProvider do + def attach_stream(_model, _context, _opts, _finch_name) do + {:ok, + Finch.build( + :post, + "https://example.com/stream", + [{"content-type", "application/json"}], + String.duplicate("x", 70_000) + )} + end + end + + defmodule LiveStreamProvider do + def attach_stream(_model, _context, opts, _finch_name) do + body = + opts + |> Keyword.get(:request_body, %{"thinking" => %{"type" => "enabled"}}) + |> Jason.encode!() + + {:ok, + Finch.build( + :post, + Keyword.fetch!(opts, :stream_url), + [{"content-type", "application/json"}], + body + )} + end + end + test "returns error when provider module doesn't exist" do {:ok, stream_server} = MockStreamServer.start_link() {:ok, context} = Context.normalize("Test") @@ -164,10 +283,173 @@ defmodule ReqLLM.Streaming.FinchClientTest do assert is_pid(task_pid) assert %HTTPContext{} = http_context assert is_map(canonical_json) - assert http_context.url == "https://api.openai.com/v1/chat/completions" + assert String.starts_with?(http_context.url, "https://") + assert String.ends_with?(http_context.url, "/chat/completions") assert http_context.method == :post assert is_map(http_context.req_headers) end + + test "returns provider_build_failed when provider attach_stream returns an error" do + {:ok, stream_server} = MockStreamServer.start_link() + {:ok, context} = Context.normalize("Test") + + assert {:error, {:provider_build_failed, :boom}} = + FinchClient.start_stream( + ErrorProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [], + stream_server + ) + end + + test "replays streaming fixtures without building a live request" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, model} = ReqLLM.model("openrouter:google/gemini-3-flash-preview") + {:ok, context} = Context.normalize("Hello") + + assert {:ok, task_pid, http_context, canonical_json} = + FinchClient.start_stream( + ReqLLM.Providers.OpenRouter, + model, + context, + [fixture: "streaming"], + stream_server + ) + + assert is_pid(task_pid) + assert http_context.url =~ "fixture://" + assert http_context.status == 200 + assert canonical_json["model"] == "google/gemini-3-flash-preview" + + Process.sleep(50) + + assert Enum.any?(EventStreamServer.events(stream_server), &match?({:status, 200}, &1)) + end + + test "allows large request bodies when finch pool config is missing" do + Application.put_env(:req_llm, :finch, []) + + {:ok, stream_server} = MockStreamServer.start_link() + {:ok, context} = Context.normalize("Test") + + assert {:ok, task_pid, http_context, canonical_json} = + FinchClient.start_stream( + LargeBodyProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [receive_timeout: 10, max_retries: 0], + stream_server + ) + + assert is_pid(task_pid) + assert %HTTPContext{} = http_context + assert canonical_json[:raw_body] != nil + end + + test "allows large request bodies when finch config cannot be parsed" do + Application.put_env(:req_llm, :finch, :invalid_config) + + {:ok, stream_server} = MockStreamServer.start_link() + {:ok, context} = Context.normalize("Test") + + assert {:ok, task_pid, _http_context, _canonical_json} = + FinchClient.start_stream( + LargeBodyProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [receive_timeout: 10, max_retries: 0], + stream_server + ) + + assert is_pid(task_pid) + end + + test "forwards successful HTTP streaming events through the stream server" do + port = reserve_port() + start_supervised!({Bandit, plug: StreamingRouter, port: port}) + + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Test") + model = %LLMDB.Model{provider: :test, id: "test"} + stream_url = "http://127.0.0.1:#{port}/stream" + + assert {:ok, task_pid, http_context, canonical_json} = + FinchClient.start_stream( + LiveStreamProvider, + model, + context, + [stream_url: stream_url], + stream_server + ) + + assert is_pid(task_pid) + assert %HTTPContext{} = http_context + assert canonical_json["thinking"]["type"] == "enabled" + + assert wait_until(fn -> + events = EventStreamServer.events(stream_server) + + Enum.any?(events, &match?({:status, 200}, &1)) and + Enum.any?(events, &match?({:data, _}, &1)) + end) + + monitor_ref = Process.monitor(task_pid) + assert_receive {:DOWN, ^monitor_ref, :process, ^task_pid, _}, 1_000 + + events = EventStreamServer.events(stream_server) + + assert Enum.any?(events, &match?({:status, 200}, &1)) + assert Enum.any?(events, &match?({:headers, _}, &1)) + assert Enum.any?(events, &match?({:data, _}, &1)) + end + + test "forwards retry stream errors through the stream server" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Test") + model = %LLMDB.Model{provider: :test, id: "test"} + + assert {:ok, task_pid, _http_context, _canonical_json} = + FinchClient.start_stream( + LiveStreamProvider, + model, + context, + [stream_url: "http://127.0.0.1:1/stream", max_retries: 0, receive_timeout: 10], + stream_server + ) + + assert is_pid(task_pid) + + assert wait_until(fn -> + Enum.any?(EventStreamServer.events(stream_server), &match?({:error, _}, &1)) + end) + end + + test "captures finch process exits as stream server errors" do + port = reserve_port() + start_supervised!({Bandit, plug: StreamingRouter, port: port}) + + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Test") + model = %LLMDB.Model{provider: :test, id: "test"} + stream_url = "http://127.0.0.1:#{port}/stream" + + assert {:ok, task_pid, _http_context, _canonical_json} = + FinchClient.start_stream( + LiveStreamProvider, + model, + context, + [stream_url: stream_url, receive_timeout: 10], + stream_server, + ReqLLM.MissingFinch + ) + + assert is_pid(task_pid) + + assert wait_until(fn -> + Enum.any?(EventStreamServer.events(stream_server), &match?({:error, _}, &1)) + end) + end end describe "provider URL and endpoint mapping" do @@ -328,7 +610,7 @@ defmodule ReqLLM.Streaming.FinchClientTest do receive do {:DOWN, ^ref, :process, ^task_pid, reason} -> - assert reason == :killed + assert reason in [:killed, :noproc] after 1000 -> flunk("Task did not terminate") end diff --git a/test/req_llm/streaming/fixtures_test.exs b/test/req_llm/streaming/fixtures_test.exs new file mode 100644 index 00000000..0dc76632 --- /dev/null +++ b/test/req_llm/streaming/fixtures_test.exs @@ -0,0 +1,103 @@ +defmodule ReqLLM.Streaming.FixturesTest do + use ExUnit.Case, async: true + + alias ReqLLM.Streaming.Fixtures + alias ReqLLM.Streaming.Fixtures.HTTPContext + + describe "HTTPContext.from_finch_request/1" do + test "omits default ports and preserves non-default ports" do + default_https = Finch.build(:get, "https://api.example.com/v1/chat") + custom_https = Finch.build(:get, "https://api.example.com:8443/v1/chat") + default_http = Finch.build(:get, "http://api.example.com/v1/chat") + custom_http = Finch.build(:get, "http://api.example.com:8080/v1/chat") + + assert HTTPContext.from_finch_request(default_https).url == + "https://api.example.com/v1/chat" + + assert HTTPContext.from_finch_request(custom_https).url == + "https://api.example.com:8443/v1/chat" + + assert HTTPContext.from_finch_request(default_http).url == "http://api.example.com/v1/chat" + + assert HTTPContext.from_finch_request(custom_http).url == + "http://api.example.com:8080/v1/chat" + end + + test "normalizes binary, atom, and unknown methods" do + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, "PUT") + ).method == + :put + + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, "PATCH") + ).method == + :patch + + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, "DELETE") + ).method == + :delete + + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, "HEAD") + ).method == + :head + + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, "OPTIONS") + ).method == + :options + + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, :post) + ).method == + :post + + assert HTTPContext.from_finch_request( + Finch.build(:get, "https://api.example.com") + |> Map.put(:method, 123) + ).method == + :unknown + end + + test "passes through unsupported header shapes unchanged" do + context = HTTPContext.new("https://api.example.com", :post, :invalid_headers) + assert context.req_headers == :invalid_headers + end + end + + describe "canonical_json_from_finch_request/1" do + test "handles nil, invalid JSON, streaming, and unknown bodies" do + assert Fixtures.canonical_json_from_finch_request( + Finch.build(:post, "https://api.example.com") + |> Map.put(:body, nil) + ) == + %{} + + assert Fixtures.canonical_json_from_finch_request( + Finch.build(:post, "https://api.example.com") + |> Map.put(:body, "{\"oops\"") + ) == %{ + raw_body: "{\"oops\"" + } + + assert Fixtures.canonical_json_from_finch_request( + Finch.build(:post, "https://api.example.com") + |> Map.put(:body, {:stream, Stream.iterate(0, &(&1 + 1))}) + ) == %{streaming_body: true} + + assert Fixtures.canonical_json_from_finch_request( + Finch.build(:post, "https://api.example.com") + |> Map.put(:body, %{unexpected: true}) + ) == + %{unknown_body: "%{unexpected: true}"} + end + end +end diff --git a/test/req_llm/streaming/sse_test.exs b/test/req_llm/streaming/sse_test.exs index 9680277b..0921eb78 100644 --- a/test/req_llm/streaming/sse_test.exs +++ b/test/req_llm/streaming/sse_test.exs @@ -76,6 +76,22 @@ defmodule ReqLLM.Streaming.SSETest do assert [%{id: "123", event: "delta", data: ~s({"text": "hi"})}] = events assert remaining == "" end + + test "parses complete JSON array chunks as SSE-style events" do + chunk = ~s([{"text":"hello"},{"text":"world"}]) + {events, remaining} = SSE.accumulate_and_parse(chunk, "") + + assert events == [%{data: %{"text" => "hello"}}, %{data: %{"text" => "world"}}] + assert remaining == "" + end + + test "extracts complete objects from partially invalid JSON arrays" do + chunk = ~s([{"text":"hello"},{"text":}]) + {events, remaining} = SSE.accumulate_and_parse(chunk, "") + + assert events == [%{data: %{"text" => "hello"}}] + assert remaining != "" + end end describe "process_sse_event/1" do diff --git a/test/req_llm/streaming/web_socket_client_test.exs b/test/req_llm/streaming/web_socket_client_test.exs new file mode 100644 index 00000000..b805e47c --- /dev/null +++ b/test/req_llm/streaming/web_socket_client_test.exs @@ -0,0 +1,152 @@ +defmodule ReqLLM.Streaming.WebSocketClientTest do + use ExUnit.Case, async: false + + alias ReqLLM.Context + alias ReqLLM.Streaming.Fixtures.HTTPContext + alias ReqLLM.Streaming.WebSocketClient + + setup do + fixtures_mode = System.get_env("REQ_LLM_FIXTURES_MODE") + System.put_env("REQ_LLM_FIXTURES_MODE", "replay") + + on_exit(fn -> + restore_system_env("REQ_LLM_FIXTURES_MODE", fixtures_mode) + end) + + :ok + end + + defp restore_system_env(key, nil), do: System.delete_env(key) + defp restore_system_env(key, value), do: System.put_env(key, value) + + defmodule EventStreamServer do + use GenServer + + def start_link do + GenServer.start_link(__MODULE__, []) + end + + def events(pid) do + GenServer.call(pid, :events) + end + + def init(_), do: {:ok, []} + + def handle_call(:events, _from, state) do + {:reply, Enum.reverse(state), state} + end + + def handle_call({:http_event, event}, _from, state) do + {:reply, :ok, [event | state]} + end + end + + defmodule ErrorProvider do + def attach_websocket_stream(_model, _context, _opts), do: {:error, :boom} + end + + defmodule MissingWebSocketProvider do + end + + defmodule RaisingProvider do + def attach_websocket_stream(_model, _context, _opts), do: raise("boom") + end + + defmodule SuccessfulProvider do + def attach_websocket_stream(_model, _context, _opts) do + {:ok, + %{ + url: "ws://127.0.0.1:1/socket", + headers: [{"authorization", "Bearer secret"}], + initial_messages: [Jason.encode!(%{"type" => "response.create"})], + canonical_json: %{"type" => "response.create"} + }} + end + end + + test "returns a request error when provider does not implement websocket streaming" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Hello") + + assert {:error, %ReqLLM.Error.API.Request{}} = + WebSocketClient.start_stream( + MissingWebSocketProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [], + stream_server + ) + end + + test "wraps provider websocket build errors" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Hello") + + assert {:error, {:provider_build_failed, :boom}} = + WebSocketClient.start_stream( + ErrorProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [], + stream_server + ) + end + + test "wraps provider websocket exceptions" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Hello") + + assert {:error, {:build_request_failed, %RuntimeError{message: "boom"}}} = + WebSocketClient.start_stream( + RaisingProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [], + stream_server + ) + end + + test "replays websocket fixtures through the stream server" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, model} = ReqLLM.model("openrouter:google/gemini-3-flash-preview") + {:ok, context} = Context.normalize("Hello") + + assert {:ok, task_pid, http_context, canonical_json} = + WebSocketClient.start_stream( + ReqLLM.Providers.OpenRouter, + model, + context, + [fixture: "streaming"], + stream_server + ) + + assert is_pid(task_pid) + assert %HTTPContext{} = http_context + assert http_context.url =~ "openrouter.ai" + assert http_context.status == 200 + assert canonical_json["model"] == "google/gemini-3-flash-preview" + + Process.sleep(50) + + assert Enum.any?(EventStreamServer.events(stream_server), &match?({:status, 200}, &1)) + end + + test "starts websocket streaming tasks with provider-built config" do + {:ok, stream_server} = EventStreamServer.start_link() + {:ok, context} = Context.normalize("Hello") + + assert {:ok, task_pid, http_context, canonical_json} = + WebSocketClient.start_stream( + SuccessfulProvider, + %LLMDB.Model{provider: :test, id: "test"}, + context, + [connect_timeout: 10, receive_timeout: 10], + stream_server + ) + + assert is_pid(task_pid) + assert %HTTPContext{} = http_context + assert http_context.url == "ws://127.0.0.1:1/socket" + assert canonical_json == %{"type" => "response.create"} + end +end diff --git a/test/req_llm/streaming_test.exs b/test/req_llm/streaming_test.exs new file mode 100644 index 00000000..6966d54a --- /dev/null +++ b/test/req_llm/streaming_test.exs @@ -0,0 +1,30 @@ +defmodule ReqLLM.StreamingTest do + use ExUnit.Case, async: true + + alias ReqLLM.{Context, Streaming} + + defmodule FailingHttpProvider do + def attach_stream(_model, _context, _opts, _finch_name), do: {:error, :boom} + end + + defmodule FailingWebSocketProvider do + def stream_transport(_model, _opts), do: :websocket + def attach_websocket_stream(_model, _context, _opts), do: {:error, :boom} + end + + test "wraps HTTP transport startup failures" do + {:ok, context} = Context.normalize("Hello") + model = %LLMDB.Model{provider: :test, id: "test"} + + assert {:error, {:http_streaming_failed, {:provider_build_failed, :boom}}} = + Streaming.start_stream(FailingHttpProvider, model, context, []) + end + + test "wraps websocket transport startup failures" do + {:ok, context} = Context.normalize("Hello") + model = %LLMDB.Model{provider: :test, id: "test"} + + assert {:error, {:websocket_streaming_failed, {:provider_build_failed, :boom}}} = + Streaming.start_stream(FailingWebSocketProvider, model, context, []) + end +end