fix(langchain): preserve LangGraph control flow traces#1632
fix(langchain): preserve LangGraph control flow traces#1632
Conversation
e0e81a6 to
19a231e
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e0e81a67d5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@claude review |
There was a problem hiding this comment.
Additional findings (outside current diff — PR may have been updated during review):
-
🔴
langfuse/langchain/CallbackHandler.py:666-681— Whenon_chain_startfails internally (its exception is caught and swallowed), no observation is registered, so_detach_observation()returnsNonein bothon_chain_endandon_chain_error. In that case the_clear_resume_trace_context()call — which is guarded insideif span is not None/if observation is not None— is skipped entirely, while thefinallyblock only calls_exit_propagation_context()and_reset(). A stale_resume_trace_contextfrom a prior LangGraph interrupt therefore persists and is silently consumed by the very next root-level invocation, stitching an unrelated run onto the interrupted trace. The fix is to callself._clear_resume_trace_context()unconditionally in thefinallyblock of both methods (guarded only byparent_run_id is None).Extended reasoning...
What the bug is and how it manifests
The PR introduces
_resume_trace_contextto stitch a LangGraph-resumed root run back onto the original interrupted trace._persist_resume_trace_context()saves the context when a control-flow error bubbles up, and_consume_root_trace_context()(called inon_chain_start,on_tool_start, etc.) reads and clears it. However, two terminal callbacks —on_chain_endandon_chain_error— only call_clear_resume_trace_context()inside theirif span/observation is not Noneguard. Thefinallyblock in both methods calls_reset()(which clears only_child_to_parent_run_id_map) and_exit_propagation_context(), but never clears_resume_trace_context.The specific code path that triggers it
- A prior control-flow interrupt (e.g.
DummyControlFlowError) causeson_chain_errorto call_persist_resume_trace_context(), setting_resume_trace_context. - The next invocation calls
on_chain_start. Insideon_chain_start,propagate_attributes().__enter__()(or any code beforestart_observation) raises an exception. The outerexcept Exceptioninon_chain_startcatches and swallows it — no observation is registered. - LangChain's runtime (which manages its own run lifecycle independently) later fires
on_chain_endoron_chain_errorfor that run. - In
on_chain_end:_detach_observation(run_id)returnsNone→span is None→_clear_resume_trace_context()is skipped →finallycalls only_exit_propagation_context()and_reset(). _resume_trace_contextis still populated and will be consumed by the subsequent fresh root-level invocation, silently stitching it onto the interrupted trace.
Why existing code doesn't prevent it
_reset()(called infinally) only clears_child_to_parent_run_id_map— there is no mention of_resume_trace_contextthere._consume_root_trace_context()would clear it, but only if a successfulon_chain_startruns to the point wherestart_observationis invoked. The only defensive clears are inside theif observation/span is not Noneguards, which are bypassed when startup failed.Impact
A completely unrelated root-level invocation (e.g. a new user query) gets attached to a previous, interrupted trace instead of starting a fresh one. This produces invisible, incorrect trace stitching in the Langfuse UI — the new run appears as a child of the old interrupted trace — a silent data-correctness bug with no error signal to the user.
How to fix it
Add
self._clear_resume_trace_context()to thefinallyblock of bothon_chain_endandon_chain_error, guarded byparent_run_id is None:finally: if parent_run_id is None: self._clear_resume_trace_context() # ADD THIS self._exit_propagation_context() self._reset()
This mirrors the same safe-guard already present in
on_llm_end(line added in this PR:self._clear_resume_trace_context()beforeself._reset()).Step-by-step proof
- Call
handler.on_chain_error(DummyControlFlowError(...), run_id=R1)for a root run (parent_run_id=None)._persist_resume_trace_context()setshandler._resume_trace_context = {"trace_id": "T1", ...}. _reset()fires infinally— clears_child_to_parent_run_id_maponly._resume_trace_contextis still{"trace_id": "T1", ...}.handler.on_chain_start({"name": "LangGraph"}, ..., run_id=R2)is called. Supposepropagate_attributes().__enter__()raises inside the try block. Exception is caught; no observation is stored for R2.- LangChain calls
handler.on_chain_end({}, run_id=R2, parent_run_id=None)._detach_observation(R2)returnsNone. Theif span is not Noneblock is skipped._clear_resume_trace_context()is never called.finallycalls_reset()— still doesn't touch_resume_trace_context. handler.on_chain_start({"name": "LangGraph"}, ..., run_id=R3)fires for a completely fresh root invocation._consume_root_trace_context()returns{"trace_id": "T1", ...}and clears it. R3's span is now created under trace T1 — incorrectly stitched onto the interrupted run.
- A prior control-flow interrupt (e.g.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 19a231e570
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ad2e408fdf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fccbffcb3e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 12bc17b58b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
All previously flagged critical issues have been addressed — the fix is correct and the new tests cover the key scenarios well.
Extended reasoning...
Overview
This PR touches two files: the LangChain callback handler (~1300 LOC) and its unit tests (~830 LOC added). It adds interrupt/resume trace stitching for LangGraph control-flow exceptions and extends the existing DEFAULT-level classification from chain errors to tool, retriever, and LLM error handlers.
Security risks
None. The change is purely observability instrumentation with no auth, crypto, or permission logic.
Level of scrutiny
The PR is moderately complex — it introduces new stateful fields, multiple interacting helper methods, and a non-trivial cleanup strategy — but the test coverage is thorough. Tests cover: multi-thread keyed resume contexts, non-resume Command passthrough, span creation failure recovery, bounded dict eviction, and concurrent root run key preservation. The two remaining inline comments are both nits (performance of the O(N²) _reset cleanup path for large traces, and a test-isolation gap if an assertion in the last test fails without a finally guard), neither of which affects correctness for typical usage.
Other factors
Every issue raised in previous review rounds — matching all Command types, missing resume key seeding in on_tool_start/on_retriever_start, missing cleanup finally blocks, span creation atomicity, unbounded dict, and coupled imports — has been addressed in the current revision. The logic is sound and the inline nit comments are already attached to the relevant lines.
| with patch("builtins.__import__", side_effect=import_without_langgraph_command): | ||
| reloaded_module = importlib.reload(callback_handler_module) | ||
| assert reloaded_module.LANGGRAPH_COMMAND_TYPE is None | ||
| assert any( | ||
| exception_type.__name__ == "GraphBubbleUp" | ||
| for exception_type in reloaded_module.CONTROL_FLOW_EXCEPTION_TYPES | ||
| ) | ||
|
|
||
| importlib.reload(callback_handler_module) |
There was a problem hiding this comment.
🟡 The final importlib.reload(callback_handler_module) restore call in test_graphbubbleup_import_is_independent_from_command_import is not inside a finally block, so if either assertion fails the module remains with LANGGRAPH_COMMAND_TYPE = None. Although this test is currently last in the file under default ordering (limiting immediate impact), test randomization, --lf reruns, or future test additions after it would cause subsequent tests calling _is_langgraph_resume() to silently return False, breaking resume-stitching. Wrap the restore reload in a try/finally.
Extended reasoning...
What the bug is and how it manifests
The test test_graphbubbleup_import_is_independent_from_command_import (lines 814–833) reloads callback_handler_module inside a with patch(...) block, which causes the module to be imported without langgraph.types, setting LANGGRAPH_COMMAND_TYPE = None. The restore call importlib.reload(callback_handler_module) on the final line is outside the with block but is NOT guarded by a finally clause.
The specific code path that triggers it
If either assertion — assert reloaded_module.LANGGRAPH_COMMAND_TYPE is None or assert any(exception_type.name == 'GraphBubbleUp' ...) — raises an AssertionError, Python unwinds out of the with block (which correctly restores builtins.import), but execution never reaches the final importlib.reload. The module object pointed to by the module-level variable callback_handler_module retains LANGGRAPH_COMMAND_TYPE = None for the remainder of the process.
Why existing code does not prevent it
importlib.reload modifies the module object in-place. The with patch(...) context manager only restores builtins.import; it does not undo the reload's side effects on the module object. There is no try/finally to guarantee the restore reload runs even on assertion failure.
Addressing the refutation
The refutation correctly observes that test_graphbubbleup_import_is_independent_from_command_import is currently the last test defined in the file, which under pytest's default top-to-bottom collection order means no subsequent same-file test would be affected by a failed assertion. This limits the practical impact in the default configuration. However, three scenarios break this assumption: (1) pytest-randomly or pytest-random-order plugins shuffle test order; (2) pytest --lf reruns only the failed test first in a new session, and its broken module state contaminates all other tests collected afterward; (3) any developer adding a new test after this one inherits the silent failure. The refutation's claim of a 'duplicate of bug_004' is not borne out by the synthesis — only bug_001 appears here, and the underlying correctness issue is genuine.
What the impact would be if triggered
Every subsequent test on the same callback_handler_module that calls handler._is_langgraph_resume() would find LANGGRAPH_COMMAND_TYPE is None and always return False. Tests such as test_control_flow_resume_uses_thread_keyed_explicit_resume_context and test_control_flow_resume_ignores_non_resume_commands would fail with misleading assertion errors unrelated to the feature they are testing, making diagnosis difficult.
Step-by-step proof
- pytest-randomly shuffles order; test_graphbubbleup_import_is_independent_from_command_import runs before test_control_flow_resume_uses_thread_keyed_explicit_resume_context.
- The with patch(...) block reloads the module, setting callback_handler_module.LANGGRAPH_COMMAND_TYPE = None.
- An assertion fails; AssertionError propagates; the with block exits cleanly (builtins.import restored), but the final importlib.reload is skipped.
- callback_handler_module.LANGGRAPH_COMMAND_TYPE remains None.
- The next test creates CallbackHandler(), calls on_chain_start with Command(resume=...); _is_langgraph_resume returns False because LANGGRAPH_COMMAND_TYPE is None.
- Resume context is never consumed; the resumed span starts on a disconnected trace; the assertion resumed_span.parent.span_id == initial_span.context.span_id fails spuriously.
How to fix
Wrap the restore in a try/finally so it runs regardless of assertion outcome:
try:
with patch('builtins.__import__', side_effect=import_without_langgraph_command):
reloaded_module = importlib.reload(callback_handler_module)
assert reloaded_module.LANGGRAPH_COMMAND_TYPE is None
assert any(
exception_type.__name__ == 'GraphBubbleUp'
for exception_type in reloaded_module.CONTROL_FLOW_EXCEPTION_TYPES
)
finally:
importlib.reload(callback_handler_module)
| if observation is not None: | ||
| level, status_message = self._get_error_level_and_status_message(error) | ||
| observation.update( | ||
| level="ERROR", | ||
| status_message=str(error), | ||
| level=cast( | ||
| Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]], | ||
| level, | ||
| ), | ||
| status_message=status_message, | ||
| input=kwargs.get("inputs"), | ||
| cost_details={"total": 0}, | ||
| ).end() | ||
|
|
||
| if parent_run_id is None and level == "DEFAULT": | ||
| self._persist_resume_trace_context( | ||
| run_id=run_id, observation=observation | ||
| ) | ||
| elif parent_run_id is None: | ||
| self._clear_root_run_resume_key(run_id) | ||
|
|
There was a problem hiding this comment.
🟡 In on_chain_error, _persist_resume_trace_context is correctly called before observation.end() (line 868), so the resume key is saved even if end() throws. In the three non-chain handlers (on_retriever_error, on_tool_error, on_llm_error), observation.update().end() is chained and _persist_resume_trace_context is called afterward; if end() throws, the except block is entered without saving the resume key, and the subsequent finally: _reset(run_id) destroys the entire _RootRunState including its resume_key, losing it permanently. Fix by moving _persist_resume_trace_context before the .end() call in these three handlers, mirroring on_chain_error.
Extended reasoning...
What the bug is and how it manifests
In on_chain_error, observation.update() and observation.end() are separate calls, with _persist_resume_trace_context invoked between them (line 868 before line 875). Even if observation.end() raises, the resume key has already been transferred into _pending_resume_trace_contexts. The three non-chain handlers (on_retriever_error, on_tool_error, on_llm_error) use observation.update(...).end() as a single chained expression, then call _persist_resume_trace_context afterward. If the chained .end() raises, Python immediately jumps to the except block and _persist_resume_trace_context is never reached.
The specific code path that triggers it
Three conditions must hold simultaneously: (1) the error handler is invoked for a root-level run (parent_run_id is None); (2) the exception is a LangGraph control-flow type so level == "DEFAULT"; (3) observation.update().end() raises internally. When all three hold, the resume key stored in root_run_state.resume_key is never popped into _pending_resume_trace_contexts.
Why existing code does not prevent it
The finally: if parent_run_id is None: self._reset(run_id) block then executes unconditionally. _reset calls self._root_run_states.pop(run_state.root_run_id, None), which removes the entire _RootRunState object. Since _persist_resume_trace_context was never called, root_run_state.resume_key still holds the thread's resume key — but the object is now discarded. The key never reaches _pending_resume_trace_contexts.
Step-by-step proof
- on_tool_start(run_id=A, parent_run_id=None, metadata={"thread_id": "t1"}) creates _RootRunState(resume_key="t1") in _root_run_states[A].
- Tool raises GraphBubbleUp; on_tool_error fires with run_id=A, parent_run_id=None.
- _get_error_level_and_status_message returns ("DEFAULT", ...); code enters the if observation is not None branch.
- observation.update(...).end() raises (SDK internal error, OTel resource exhaustion, etc.).
- except Exception catches; execution jumps to finally. _persist_resume_trace_context at line 473 was never reached; _pending_resume_trace_contexts has no entry for "t1".
- finally: _reset(A) pops _root_run_states[A] — the _RootRunState with resume_key="t1" is discarded.
- Next Command(resume=...) invocation on thread "t1": _take_root_trace_context returns None. A disconnected trace is created instead of stitching to the interrupted one.
How to fix
Split observation.update().end() into separate calls and move _persist_resume_trace_context / _clear_root_run_resume_key before the .end() call in all three non-chain handlers, mirroring on_chain_error. The observation.trace_id and observation.id fields are assigned at span creation time, not at end time, so calling persist before end() is safe.
Summary
This updates the LangChain callback handler to treat LangGraph control-flow exceptions as expected behavior instead of hard errors, and to preserve trace continuity across interrupt/resume cycles.
What changed
CONTROL_FLOW_EXCEPTION_TYPESasDEFAULTinstead ofERRORinon_tool_error,on_retriever_error,on_llm_error, andon_chain_errorRoot cause
LangGraph uses
GraphBubbleUpsubclasses for expected control flow such as interrupts and handoffs. The callback handler only recognized that pattern inon_chain_error, while other error callbacks always marked those paths as failures. Separately, the handler reset root callback state after a bubbled interrupt, so the next resumed root run started a fresh trace.Impact
Verification
python -m py_compile langfuse/langchain/CallbackHandler.py tests/unit/test_langchain.pypytest,langchain, orlangchain_openaiinstalled outside the project-managed environmentgit commitrequired--no-verifybecause the local git hook referencespre-commit, which is not available in this shellContext
This addresses the behavior investigated in LFE-8001 and LFE-8520.
Disclaimer: Experimental PR review
Greptile Summary
This PR fixes two related LangGraph callback handler issues: it extends the
CONTROL_FLOW_EXCEPTION_TYPESclassification fromon_chain_errorto all four error callbacks (on_tool_error,on_retriever_error,on_llm_error), and adds_resume_trace_contextto stitch a resumed graph run back onto the original interrupted trace._persist_resume_trace_context/_consume_root_trace_contextpairing is logically sound: the resume context is consumed exactly once by the first root-level chain start that follows an interrupt, and_reset()(which only clears_child_to_parent_run_id_map) does not wipe it._persist_resume_trace_contextis invoked beforeobservation.end()inon_chain_errorbut after in the three other error handlers — harmless since span IDs are assigned at creation time, but slightly confusing.on_tool_error/on_retriever_error/on_llm_errordo not call_exit_propagation_context()or_reset()for root-level control-flow errors (consistent with pre-existing behaviour for those callbacks, but now those paths also write_resume_trace_context).Confidence Score: 5/5
Safe to merge — core logic is correct, findings are minor edge-case style issues that don't affect the primary LangGraph interrupt/resume path.
The resume trace-stitching logic is sound:
_reset()doesn't clear_resume_trace_context,_consume_root_trace_context()is called only at root-level span creation, and the context is consumed exactly once. The two remaining comments are P2: one is a cosmetic ordering inconsistency (harmless), and the other covers an edge case (root-level tool/LLM errors) that does not arise in typical LangGraph usage. No data loss, no security issues, tests cover both new behaviours.No files require special attention for merge.
Important Files Changed
_resume_trace_contextstate to stitch LangGraph resume runs back to the original trace, and introduces_get_error_level_and_status_messageto consistently classify control-flow exceptions as DEFAULT across all four error callbacks. Minor inconsistency:_persist_resume_trace_contextis called beforeobservation.end()inon_chain_errorbut after in the other three handlers; and tool/retriever/LLM error paths don't call_exit_propagation_context/_reset()for root-level control-flow errors.CONTROL_FLOW_EXCEPTION_TYPESinfinallyblocks and correctly importotel_context.Sequence Diagram
sequenceDiagram participant LG as LangGraph participant CB as CallbackHandler participant LF as Langfuse Trace Note over LG,LF: First invocation — interrupted LG->>CB: on_chain_start(run_id=A) CB->>LF: start_observation(trace_context=None) → Span A (new trace T1) LG->>CB: on_chain_error(GraphBubbleUp, run_id=A) CB->>CB: _get_error_level_and_status_message → (DEFAULT, msg) CB->>CB: _persist_resume_trace_context({trace_id: T1, parent_span_id: A}) CB->>LF: observation.end(level=DEFAULT) CB->>CB: _exit_propagation_context() + _reset() Note over LG,LF: Second invocation — resume LG->>CB: on_chain_start(run_id=B) CB->>CB: _consume_root_trace_context() → {trace_id: T1, parent_span_id: A} CB->>LF: start_observation(trace_context={T1, A}) → Span B (child of A, same trace T1) LG->>CB: on_chain_end(run_id=B) CB->>CB: _clear_resume_trace_context() CB->>LF: span.end() Note over LG,LF: Third invocation — fresh LG->>CB: on_chain_start(run_id=C) CB->>CB: _consume_root_trace_context() → None (cleared) CB->>LF: start_observation(trace_context=None) → Span C (new trace T2) LG->>CB: on_chain_end(run_id=C) CB->>LF: span.end()Prompt To Fix All With AI
Reviews (2): Last reviewed commit: "test(langchain): isolate resume trace as..." | Re-trigger Greptile