Skip to content

fix(openai): preserve native v1 stream contract#1627

Merged
hassiebp merged 6 commits intomainfrom
hassieb/lfe-8788-bug-langfuseresponsegeneratorasync-object-has-no-attribute
Apr 21, 2026
Merged

fix(openai): preserve native v1 stream contract#1627
hassiebp merged 6 commits intomainfrom
hassieb/lfe-8788-bug-langfuseresponsegeneratorasync-object-has-no-attribute

Conversation

@hassiebp
Copy link
Copy Markdown
Contributor

@hassiebp hassiebp commented Apr 15, 2026

Summary

Fix OpenAI v1 streaming instrumentation to preserve the original openai.Stream and openai.AsyncStream objects instead of replacing them with Langfuse wrapper types.

This keeps the native stream contract intact for _iterator, .response, close() / aclose(), async for, and manual __anext__() while still collecting streamed chunks for Langfuse generation updates.

Changes

  • instrument native OpenAI v1 stream objects in place by wrapping their internal iterator
  • keep the existing Langfuse wrapper classes as a fallback for non-OpenAI generator-style streams
  • add regression tests for sync and async native stream behavior, including async for and __anext__()

Verification

  • uv run --frozen ruff check langfuse/openai.py tests/unit/test_openai.py
  • uv run --frozen pytest tests/unit/test_openai.py

Refs LFE-8788.

Disclaimer: Experimental PR review

Greptile Summary

This PR fixes OpenAI v1 streaming instrumentation by patching _iterator and close/aclose on the native openai.Stream and openai.AsyncStream objects in place, rather than replacing them with Langfuse wrapper types. The approach is sound: _iterator replacement correctly intercepts all iteration paths (__iter__, __next__, __anext__, async for) as confirmed by the SDK's implementation, and the is_finalized flag reliably prevents double-finalization.

Confidence Score: 5/5

Safe to merge — core patching logic is correct, double-finalization is guarded, and tests cover all three iteration patterns.

All remaining findings are P2 style suggestions. The _finalize_stream_response_async wrapper is harmless. No P0/P1 logic bugs found; the _iterator replacement approach is confirmed sound by the OpenAI SDK's implementation.

No files require special attention.

Important Files Changed

Filename Overview
langfuse/openai.py Adds _instrument_openai_stream / _instrument_openai_async_stream to monkey-patch _iterator and close/aclose on native openai.Stream objects, preserving type identity while still collecting chunks for Langfuse; logic is sound but _finalize_stream_response_async is a no-op async wrapper.
tests/unit/test_openai.py Adds three regression tests covering sync native stream, async native stream, and manual __anext__ usage; DummyOpenAIStream / DummyOpenAIAsyncStream correctly bypass super().__init__ and set _iterator directly, which is appropriate for unit-level testing.

Sequence Diagram

sequenceDiagram
    participant User
    participant openai.Stream
    participant traced_iterator (generator)
    participant raw_iterator
    participant finalize_once
    participant LangfuseGeneration

    Note over User,LangfuseGeneration: _instrument_openai_stream patches _iterator and close in place

    User->>openai.Stream: for chunk in stream (or next())
    openai.Stream->>traced_iterator (generator): __next__() via self._iterator
    traced_iterator (generator)->>raw_iterator: next item
    raw_iterator-->>traced_iterator (generator): chunk
    traced_iterator (generator)->>traced_iterator (generator): items.append(chunk), set completion_start_time
    traced_iterator (generator)-->>openai.Stream: yield chunk
    openai.Stream-->>User: chunk

    Note over traced_iterator (generator): On exhaustion or close()

    traced_iterator (generator)->>finalize_once: finally block (is_finalized guard)
    finalize_once->>LangfuseGeneration: _finalize_stream_response → generation.end()

    alt User calls stream.close() explicitly
        User->>openai.Stream: close() [patched to traced_close]
        openai.Stream->>openai.Stream: await original_close() (closes HTTP)
        openai.Stream->>finalize_once: finally in traced_close
        finalize_once->>finalize_once: is_finalized=True → no-op if already finalized
    end
Loading

Reviews (1): Last reviewed commit: "fix(openai): preserve native v1 stream c..." | Re-trigger Greptile

@github-actions
Copy link
Copy Markdown

@claude review

Comment thread langfuse/openai.py
Comment thread langfuse/openai.py
Comment thread langfuse/openai.py Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b5a7071be7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread langfuse/openai.py
Comment thread langfuse/openai.py Outdated
Comment thread langfuse/openai.py
Comment thread langfuse/openai.py
@hassiebp hassiebp merged commit 7aa632f into main Apr 21, 2026
18 checks passed
@hassiebp hassiebp deleted the hassieb/lfe-8788-bug-langfuseresponsegeneratorasync-object-has-no-attribute branch April 21, 2026 11:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant