From 403a7bf929b7928a6918a83f85a47b6325bd5090 Mon Sep 17 00:00:00 2001 From: Valentin Stoykov Date: Wed, 17 Jun 2026 11:44:00 +0100 Subject: [PATCH 1/2] fix(openai): capture trailing streamed usage chunk after finish_reason MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some OpenAI-compatible providers — notably Azure OpenAI and gateways like LiteLLM — stream token usage in a SEPARATE chunk that arrives AFTER the finish_reason chunk and just before [DONE], when stream_options.include_usage is set: data: {"choices":[{"finish_reason":"stop","index":0,"delta":{}}]} data: {"choices":[{"index":0,"delta":{}}],"usage":{...}} data: [DONE] default_decode_stream_event flags the finish_reason chunk terminal?: true, which finalizes the stream there. A consumer that reads Response.usage right after the stream "completes" then races the still-in-flight usage chunk, so input/output tokens (and any cost derived from them) come back as zero. Non-streaming responses carry usage in the single body and are unaffected. ChatAPI.decode_stream_event/2 strips the terminal flag off normal-completion finish_reason chunks so the stream finalizes on [DONE] (or connection close) instead — by which point the usage chunk has been accumulated. Inline error chunks (finish_reason: :error, or any chunk carrying an :error key) keep their terminal flag so failures still surface immediately. [DONE] and empty-choices usage chunks have no :finish_reason key and are untouched. Adds regression tests for the chunk ordering, the combined finish_reason+usage chunk, empty-choices usage, and error-chunk termination. --- lib/req_llm/providers/openai/chat_api.ex | 28 ++++- .../openai_chat_streaming_usage_test.exs | 107 ++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 test/providers/openai_chat_streaming_usage_test.exs diff --git a/lib/req_llm/providers/openai/chat_api.ex b/lib/req_llm/providers/openai/chat_api.ex index 61b01e93..5d0b9aa2 100644 --- a/lib/req_llm/providers/openai/chat_api.ex +++ b/lib/req_llm/providers/openai/chat_api.ex @@ -70,7 +70,33 @@ defmodule ReqLLM.Providers.OpenAI.ChatAPI do @impl true def decode_stream_event(event, model) do - ReqLLM.Provider.Defaults.default_decode_stream_event(event, model) + event + |> ReqLLM.Provider.Defaults.default_decode_stream_event(model) + |> drop_finish_reason_terminal() + end + + # Azure and OpenAI-compatible gateways (e.g. LiteLLM) send the usage chunk + # after the finish_reason chunk, just before [DONE]. The default decoder marks + # finish_reason terminal, so the stream ends before that usage chunk lands and + # Response.usage comes back zero. Drop terminal? on normal finish_reason chunks + # so the stream finalizes on [DONE] (or connection close) once usage is in. + # Error chunks (finish_reason: :error, or an :error key) stay terminal so + # failures still fail fast; [DONE] and empty-choices usage chunks have no + # finish_reason and are left untouched. + defp drop_finish_reason_terminal(chunks) do + Enum.map(chunks, fn + %ReqLLM.StreamChunk{ + type: :meta, + metadata: %{terminal?: true, finish_reason: reason} = meta + } = chunk + when reason != :error -> + if Map.has_key?(meta, :error), + do: chunk, + else: %{chunk | metadata: Map.delete(meta, :terminal?)} + + chunk -> + chunk + end) end # ======================================================================== diff --git a/test/providers/openai_chat_streaming_usage_test.exs b/test/providers/openai_chat_streaming_usage_test.exs new file mode 100644 index 00000000..93b754c9 --- /dev/null +++ b/test/providers/openai_chat_streaming_usage_test.exs @@ -0,0 +1,107 @@ +defmodule ReqLLM.Providers.OpenAI.ChatStreamingUsageTest do + @moduledoc """ + Regression test for the Azure OpenAI / LiteLLM trailing-usage ordering. + + Azure (and OpenAI-compatible gateways like LiteLLM) stream the token `usage` + in a SEPARATE chunk that arrives AFTER the `finish_reason` chunk and just + before `[DONE]`: + + data: {"choices":[{"finish_reason":"stop","index":0,"delta":{}}]} + data: {"choices":[{"index":0,"delta":{}}],"usage":{...}} + data: [DONE] + + If the finish_reason chunk is flagged `terminal?`, the stream halts there and + the consumer reads `Response.usage` before the trailing usage chunk has been + merged — so token counts (and the cost derived from them) come back as zero. + `ChatAPI.decode_stream_event/2` strips the terminal flag off finish_reason + chunks so the stream finalizes on `[DONE]` instead. + """ + use ExUnit.Case, async: true + + alias ReqLLM.Providers.OpenAI.ChatAPI + alias ReqLLM.StreamChunk + + @model %LLMDB.Model{provider: :openai, id: "gpt-4o"} + + test "finish_reason chunk is not terminal (so the trailing usage chunk is not raced)" do + finish_event = %{ + data: %{ + "choices" => [%{"finish_reason" => "stop", "index" => 0, "delta" => %{}}] + } + } + + chunks = ChatAPI.decode_stream_event(finish_event, @model) + meta = Enum.find(chunks, &match?(%StreamChunk{type: :meta}, &1)) + + assert meta, "expected a meta chunk carrying the finish_reason" + assert meta.metadata[:finish_reason] == :stop + refute Map.get(meta.metadata, :terminal?), "finish_reason must not terminate the stream" + end + + test "the trailing usage chunk (non-empty choices, no finish_reason) yields usage" do + usage_event = %{ + data: %{ + "choices" => [%{"index" => 0, "delta" => %{}}], + "usage" => %{ + "prompt_tokens" => 12, + "completion_tokens" => 8, + "total_tokens" => 20 + } + } + } + + [meta] = ChatAPI.decode_stream_event(usage_event, @model) + assert meta.type == :meta + assert meta.metadata[:usage][:input_tokens] == 12 + assert meta.metadata[:usage][:output_tokens] == 8 + end + + test "[DONE] remains terminal" do + [meta] = ChatAPI.decode_stream_event(%{data: "[DONE]"}, @model) + assert meta.metadata[:terminal?] == true + end + + test "an inline error chunk STAYS terminal (must fail fast, not wait for [DONE])" do + # OpenAI-compatible gateways (incl. LiteLLM/Azure) report mid-stream + # failures as `data: {"error": {...}}` with an HTTP 200. These must remain + # terminal so the stream errors immediately instead of hanging. + error_event = %{data: %{"error" => %{"message" => "boom"}}} + + [meta] = ChatAPI.decode_stream_event(error_event, @model) + assert meta.metadata[:finish_reason] == :error + assert meta.metadata[:terminal?] == true + end + + test "an empty-choices usage chunk keeps its own terminal flag" do + # Some servers send the final usage with `choices: []`; the default decoder + # marks that terminal. It has no :finish_reason key, so it must pass through + # untouched. + usage_event = %{ + data: %{ + "choices" => [], + "usage" => %{"prompt_tokens" => 5, "completion_tokens" => 2, "total_tokens" => 7} + } + } + + [meta] = ChatAPI.decode_stream_event(usage_event, @model) + assert meta.metadata[:terminal?] == true + assert meta.metadata[:usage][:input_tokens] == 5 + end + + test "finish_reason + usage in a single event still yields usage" do + # If a gateway combines both into one chunk (non-empty choices), usage is + # captured and the finish_reason chunk is no longer terminal (stream + # finalizes on [DONE] / close). + combined = %{ + data: %{ + "choices" => [%{"finish_reason" => "stop", "index" => 0, "delta" => %{}}], + "usage" => %{"prompt_tokens" => 11, "completion_tokens" => 4, "total_tokens" => 15} + } + } + + chunks = ChatAPI.decode_stream_event(combined, @model) + usage_meta = Enum.find(chunks, &(is_map(&1.metadata) and Map.has_key?(&1.metadata, :usage))) + assert usage_meta.metadata[:usage][:input_tokens] == 11 + refute Enum.any?(chunks, &(&1.metadata[:finish_reason] == :stop and &1.metadata[:terminal?])) + end +end From 0d340dc78562babc74a4106c88dda3da53545acb Mon Sep 17 00:00:00 2001 From: Mike Hostetler <84222+mikehostetler@users.noreply.github.com> Date: Sun, 21 Jun 2026 10:29:07 -0500 Subject: [PATCH 2/2] test(openai): cover trailing stream usage in responses --- lib/req_llm/providers/openai/chat_api.ex | 28 +---- .../openai_chat_streaming_usage_test.exs | 112 ++++++++++++++++-- 2 files changed, 100 insertions(+), 40 deletions(-) diff --git a/lib/req_llm/providers/openai/chat_api.ex b/lib/req_llm/providers/openai/chat_api.ex index 5d0b9aa2..61b01e93 100644 --- a/lib/req_llm/providers/openai/chat_api.ex +++ b/lib/req_llm/providers/openai/chat_api.ex @@ -70,33 +70,7 @@ defmodule ReqLLM.Providers.OpenAI.ChatAPI do @impl true def decode_stream_event(event, model) do - event - |> ReqLLM.Provider.Defaults.default_decode_stream_event(model) - |> drop_finish_reason_terminal() - end - - # Azure and OpenAI-compatible gateways (e.g. LiteLLM) send the usage chunk - # after the finish_reason chunk, just before [DONE]. The default decoder marks - # finish_reason terminal, so the stream ends before that usage chunk lands and - # Response.usage comes back zero. Drop terminal? on normal finish_reason chunks - # so the stream finalizes on [DONE] (or connection close) once usage is in. - # Error chunks (finish_reason: :error, or an :error key) stay terminal so - # failures still fail fast; [DONE] and empty-choices usage chunks have no - # finish_reason and are left untouched. - defp drop_finish_reason_terminal(chunks) do - Enum.map(chunks, fn - %ReqLLM.StreamChunk{ - type: :meta, - metadata: %{terminal?: true, finish_reason: reason} = meta - } = chunk - when reason != :error -> - if Map.has_key?(meta, :error), - do: chunk, - else: %{chunk | metadata: Map.delete(meta, :terminal?)} - - chunk -> - chunk - end) + ReqLLM.Provider.Defaults.default_decode_stream_event(event, model) end # ======================================================================== diff --git a/test/providers/openai_chat_streaming_usage_test.exs b/test/providers/openai_chat_streaming_usage_test.exs index 93b754c9..7321732e 100644 --- a/test/providers/openai_chat_streaming_usage_test.exs +++ b/test/providers/openai_chat_streaming_usage_test.exs @@ -13,17 +13,19 @@ defmodule ReqLLM.Providers.OpenAI.ChatStreamingUsageTest do If the finish_reason chunk is flagged `terminal?`, the stream halts there and the consumer reads `Response.usage` before the trailing usage chunk has been merged — so token counts (and the cost derived from them) come back as zero. - `ChatAPI.decode_stream_event/2` strips the terminal flag off finish_reason - chunks so the stream finalizes on `[DONE]` instead. + Chat-completion finish_reason chunks must remain non-terminal so the stream + finalizes on `[DONE]` after the trailing usage chunk is accumulated. """ use ExUnit.Case, async: true + alias ReqLLM.{Context, Response, StreamResponse, StreamServer} alias ReqLLM.Providers.OpenAI.ChatAPI alias ReqLLM.StreamChunk + alias ReqLLM.StreamResponse.MetadataHandle @model %LLMDB.Model{provider: :openai, id: "gpt-4o"} - test "finish_reason chunk is not terminal (so the trailing usage chunk is not raced)" do + test "finish_reason chunk is not terminal" do finish_event = %{ data: %{ "choices" => [%{"finish_reason" => "stop", "index" => 0, "delta" => %{}}] @@ -61,10 +63,7 @@ defmodule ReqLLM.Providers.OpenAI.ChatStreamingUsageTest do assert meta.metadata[:terminal?] == true end - test "an inline error chunk STAYS terminal (must fail fast, not wait for [DONE])" do - # OpenAI-compatible gateways (incl. LiteLLM/Azure) report mid-stream - # failures as `data: {"error": {...}}` with an HTTP 200. These must remain - # terminal so the stream errors immediately instead of hanging. + test "an inline error chunk stays terminal" do error_event = %{data: %{"error" => %{"message" => "boom"}}} [meta] = ChatAPI.decode_stream_event(error_event, @model) @@ -73,9 +72,6 @@ defmodule ReqLLM.Providers.OpenAI.ChatStreamingUsageTest do end test "an empty-choices usage chunk keeps its own terminal flag" do - # Some servers send the final usage with `choices: []`; the default decoder - # marks that terminal. It has no :finish_reason key, so it must pass through - # untouched. usage_event = %{ data: %{ "choices" => [], @@ -89,9 +85,6 @@ defmodule ReqLLM.Providers.OpenAI.ChatStreamingUsageTest do end test "finish_reason + usage in a single event still yields usage" do - # If a gateway combines both into one chunk (non-empty choices), usage is - # captured and the finish_reason chunk is no longer terminal (stream - # finalizes on [DONE] / close). combined = %{ data: %{ "choices" => [%{"finish_reason" => "stop", "index" => 0, "delta" => %{}}], @@ -104,4 +97,97 @@ defmodule ReqLLM.Providers.OpenAI.ChatStreamingUsageTest do assert usage_meta.metadata[:usage][:input_tokens] == 11 refute Enum.any?(chunks, &(&1.metadata[:finish_reason] == :stop and &1.metadata[:terminal?])) end + + test "to_response keeps usage that arrives after finish_reason" do + {:ok, server} = StreamServer.start_link(provider_mod: ChatAPI, model: @model) + stream_response = stream_response_for(server) + + await_metadata_waiter(server) + + send_sse(server, %{"choices" => [%{"delta" => %{"content" => "Done"}}]}) + + send_sse(server, %{ + "choices" => [%{"finish_reason" => "stop", "index" => 0, "delta" => %{}}] + }) + + send_sse(server, %{ + "choices" => [%{"index" => 0, "delta" => %{}}], + "usage" => %{"prompt_tokens" => 12, "completion_tokens" => 8, "total_tokens" => 20} + }) + + send_done(server) + StreamServer.http_event(server, :done) + + assert {:ok, response} = StreamResponse.to_response(stream_response) + assert Response.text(response) == "Done" + assert Response.finish_reason(response) == :stop + + usage = Response.usage(response) + assert usage.input_tokens == 12 + assert usage.output_tokens == 8 + assert usage.total_tokens == 20 + end + + defp stream_response_for(server) do + {:ok, metadata_handle} = + MetadataHandle.start_link(fn -> + case StreamServer.await_metadata(server, 500) do + {:ok, metadata} -> metadata + {:error, reason} -> %{error: reason} + end + end) + + %StreamResponse{ + stream: stream_from(server), + metadata_handle: metadata_handle, + cancel: fn -> StreamServer.cancel(server) end, + model: @model, + context: Context.normalize!("Say done") + } + end + + defp stream_from(server) do + Stream.resource( + fn -> false end, + fn + true -> + {:halt, true} + + false -> + case StreamServer.next(server, 500) do + {:ok, chunk} -> {[chunk], false} + :halt -> {:halt, true} + {:error, reason} -> raise "stream failed: #{inspect(reason)}" + end + end, + fn exhausted? -> + if not exhausted? do + StreamServer.cancel(server) + end + end + ) + end + + defp send_sse(server, data) do + StreamServer.http_event(server, {:data, "data: #{Jason.encode!(data)}\n\n"}) + end + + defp send_done(server) do + StreamServer.http_event(server, {:data, "data: [DONE]\n\n"}) + end + + defp await_metadata_waiter(server, attempts \\ 50) + + defp await_metadata_waiter(server, attempts) when attempts > 0 do + state = :sys.get_state(server) + + if Enum.any?(state.waiting_callers, &(&1.type == :metadata)) do + :ok + else + Process.sleep(10) + await_metadata_waiter(server, attempts - 1) + end + end + + defp await_metadata_waiter(_server, 0), do: flunk("metadata waiter was not registered") end