Skip to content

Commit 59ad906

Browse files
ikuzukiclaude
andauthored
fix(agent): Langfuse 4.x flush no longer hangs — LANGFUSE_TIMEOUT + remove explicit flush (#138)
#137 re-enabled Langfuse with OTEL_BSP_EXPORT_TIMEOUT + OTEL_EXPORTER_OTLP _TIMEOUT, which are documented-ignored by the OTLP HTTP exporter (# Not used. No way currently to pass timeout to export.) and by Langfuse's own LangfuseSpanProcessor. Every /team request post-deploy hit Lambda's 60s timeout, forcing an emergency CLI flip of LANGFUSE_TRACING_ENABLED=false. Deep source-code research on langfuse-python v4.0.6 identified two separate blocking surfaces: 1. OTLPSpanExporter retry loop — 6 retries with exponential backoff (~63s total). Bounded only by the exporter's own `timeout=` ctor arg, which Langfuse wires to the LANGFUSE_TIMEOUT env var (seconds, default 5). That's the knob, not the OTEL_* ones. 2. resource_manager.flush() → _score_ingestion_queue.join() — no timeout. If we emit scores (/chat does, via _emit_quality_scores) and Langfuse is slow, .join() blocks indefinitely on the handler thread. This is unfixable without the SDK's upcoming v4.1 span_exporter= kwarg. Additionally, LANGFUSE_FLUSH_AT=1 was counterproductive: it sets max_export_batch_size=1, making BatchSpanProcessor.on_end() trigger synchronous export on the handler thread for every span — moving the blocking HTTP upload onto the user-facing request path, which is what the background thread exists to prevent. Fix: - lambda.tf: LANGFUSE_TIMEOUT=2 (caps retry loop); LANGFUSE_FLUSH_INTERVAL =1 (frequent background flush); remove OTEL_BSP_EXPORT_TIMEOUT, OTEL _EXPORTER_OTLP_TIMEOUT, LANGFUSE_FLUSH_AT. LANGFUSE_TRACING_ENABLED stays "false" in Terraform — flip to true via CLI after apply once production latency is verified. - api.py: remove three explicit langfuse_flush() calls from /chat and /chat/sync handlers. The background batch processor drains spans + scores on its own thread; explicit flush was the path through the unbounded _score_ingestion_queue.join(). Tests: two tests that mocked langfuse_flush() were deleted — they asserted behaviour we deliberately removed. 150 tests pass. ruff, terraform validate clean. Docs: ADR-0005 revision expanded with source-level citations (SDK line numbers) so the next person who touches this doesn't redo the research. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 602271b commit 59ad906

5 files changed

Lines changed: 80 additions & 58 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
- **Infra: agent Lambda memory bumped from 1024 MB to 3008 MB** in `infrastructure/environments/dev/lambda.tf`. CloudWatch on the already-deployed `#128` image showed cold-start at ~27 seconds — Lambda's 10 s init cap tripped, the container was force-restarted, and LWA's `/health` probe only went green ~28 s in. That's outside CloudFront's 30 s origin-response window, so browser-side cold hits presented as `FPL is unreachable` on `/team` and `Stream ended before a final report` on `/chat`. The 27 s was dominated by Python imports (LangGraph + pgvector + asyncpg + langfuse + anthropic + FastAPI) on a 1024 MB Lambda's ~0.57 vCPU; Lambda scales CPU linearly with memory, so 3008 MB (~1.7 vCPU) cuts that roughly threefold into the 8–12 s range. Max observed memory usage stays under 300 MB — the knob is bought for CPU, not RAM.
2020

2121
### Fixed
22+
- **Agent: Langfuse flush no longer hangs the response thread — correct env vars + explicit-flush removal.** Follow-up to #137 after the re-enablement caused `Task timed out after 60 seconds` on every `/team` request post-deploy, forcing an emergency kill-switch flip (`LANGFUSE_TRACING_ENABLED=false` via CLI). Root cause of the hang and the fix are now documented in the expanded ADR-0005 revision — two independent blocking surfaces in Langfuse 4.x: (1) `OTEL_BSP_EXPORT_TIMEOUT` + `OTEL_EXPORTER_OTLP_TIMEOUT` are wired but ignored by OpenTelemetry's OTLP HTTP exporter (`# Not used. No way currently to pass timeout to export.` — `opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py:164`) — the actual bound on the retry loop is Langfuse's own `LANGFUSE_TIMEOUT` env var (seconds, passed straight into the exporter constructor); (2) `resource_manager.flush()` calls `_score_ingestion_queue.join()` with no timeout, so if we emit quality scores (`_emit_quality_scores` in `api.py`) and Langfuse is slow, the handler thread blocks indefinitely. Additionally `LANGFUSE_FLUSH_AT=1` was counterproductive — it sets `max_export_batch_size=1`, which makes `BatchSpanProcessor.on_end()` trigger synchronous export on the handler thread for every single span. Fix in `infrastructure/environments/dev/lambda.tf`: set `LANGFUSE_TIMEOUT="2"` (caps the retry loop at 2s) and `LANGFUSE_FLUSH_INTERVAL="1"` (background flush every 1s), remove the ignored `OTEL_*` env vars, remove `LANGFUSE_FLUSH_AT`. Fix in `services/agent/src/fpl_agent/api.py`: remove the three explicit `langfuse_flush()` calls from `/chat` and `/chat/sync` handlers — the background batch processor drains spans and scores on its own thread, and the `_score_ingestion_queue.join()` is what was hanging indefinitely. `LANGFUSE_TRACING_ENABLED` stays at `"false"` in Terraform as a deliberate kill-switch — we flip it back on via CLI after apply once production latency is verified, promoting to `"true"` in Terraform only after that. Research went to the SDK source (langfuse-python v4.0.6) to pin down exactly which env vars each blocking surface responds to.
2223
- **Agent: Langfuse tracing re-enabled with tight OTLP timeouts + module-scope client.** Three changes across `libs/fpl_lib/observability.py` and `infrastructure/environments/dev/lambda.tf`: (1) the `Langfuse()` client is now constructed lazily *once per cold-start* and cached at module scope — Langfuse maintainers flag per-call construction as a Lambda anti-pattern because it rebuilds the OTEL tracer provider ([discussion #7669](https://github.com/orgs/langfuse/discussions/7669)); (2) `_get_client()` short-circuits to `None` when `LANGFUSE_TRACING_ENABLED=false`, preserving the kill-switch; (3) Terraform pins `OTEL_EXPORTER_OTLP_TIMEOUT=3000`, `OTEL_BSP_EXPORT_TIMEOUT=5000`, `LANGFUSE_FLUSH_AT=1`, `LANGFUSE_FLUSH_INTERVAL=1000` so a slow or unreachable Langfuse endpoint adds at most ~5s to the response (vs. the 60s default that blew up `/team` in #133). This matches the pattern Langfuse's own docs recommend for low-traffic Lambda deployments and is what almost every real-world Lambda + Langfuse deployment on GitHub uses; ADOT Lambda Extension remains the production answer at higher RPS and is parked as a follow-up pending observed trace-drop rate. Four new unit tests pin the module-scope + kill-switch semantics (`test_client_is_constructed_once_across_calls`, `test_flush_short_circuits_when_tracing_disabled`, `test_record_llm_usage_short_circuits_when_tracing_disabled`, `test_client_construction_failure_is_swallowed`). Rationale + ADOT comparison documented in ADR-0005 revision (2026-04-20).
2324
- **Dashboard: step pills no longer ghost-tick ahead of the actual graph position** in `web/dashboard/src/pages/chat/StepPills.tsx`. The agent graph has a conditional edge — `reflector` can route back to `planner` for another round when it decides more data is needed — so for any query that triggers a second iteration, the stream emits `planner → tool_executor → reflector → planner → tool_executor → …` The previous renderer built a permanent `seen: Set<string>` from the steps array, so once *any* event for a node arrived, that pill stayed ticked forever. On loop-back the user saw "Reviewing ✓" next to "Gathering data" *spinning*, which broke the linear mental model the pills implicitly promise. Switched to position-based rendering: `currentIdx = NODE_ORDER.indexOf(lastStep)`; pills before `currentIdx` are done, the pill *at* `currentIdx` is in-flight, pills after are pending. When the graph jumps back to an earlier step, the later pills cleanly revert to the pending dot. Once the stream ends (`active = false`), everything ≤ `currentIdx` counts as done so the final state still reads as "all four completed". Four unit tests (new `StepPills.test.tsx`) pin the behaviour, including a regression case that asserts `[planner, tool_executor, reflector, planner]` renders Planning spinning + the rest pending.
2425
- **Dashboard: SSE parser now splits on any blank-line variant (`\r\n\r\n`, `\n\n`, or `\r\r`) and flushes the trailing buffer when the stream closes.** Fix in `web/dashboard/src/lib/agentApi.ts`. Every SSE frame the agent emits over CloudFront → Lambda Function URL ends with `\r\n\r\n` (verified via hex dump of the live stream), but the parser's `buffer.indexOf("\n\n")` never matched — `\r\n\r\n` is `\r`,`\n`,`\r`,`\n` with no adjacent LFs. The parser found *zero* frames, the `for await` loop exited without yielding, and the UI hit the `settled === false` branch and rendered "Stream ended before a final report." for every chat. Fix normalises every chunk to LF-only line endings before the frame splitter (`s.replace(/\r\n?/g, "\n")`), and flushes whatever remains in the buffer as a final frame when the reader reports `done` — some proxies close the connection without a trailing blank line, which would otherwise silently drop the terminal `result` event. Two regression tests pin both behaviours (`parses frames separated by CRLF CRLF` + `flushes a trailing frame that closes without a blank line`). Latent since #117 — masked by the Function URL 403 loop (#123 → #132) so no browser request ever reached the parser's happy path until today.

docs/adr/0005-prompt-versioning-and-llm-observability.md

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,32 @@ Prompt version metadata flows into every Langfuse trace, enabling A/B comparison
110110

111111
## Revision — 2026-04-20: Langfuse SDK v4 on Lambda tuning
112112

113-
Langfuse SDK v4 is an OpenTelemetry rewrite. On Lambda, the OTLP batch exporter's default timeouts (30s export + 2× 10s HTTP retries) stacked to a ~60s hang on the response thread whenever `cloud.langfuse.com` was slow or unreachable — `/team` returned HTTP 200 but took 60s to return (see CHANGELOG 2026-04-20). Root cause: Lambda freezes the execution environment on handler return, so any `flush()` call on the response thread pays the full export deadline inline.
113+
Langfuse SDK v4 is an OpenTelemetry rewrite. On Lambda, uploads to `cloud.langfuse.com` can stack to a 60s hang on the response thread. There are **two separate blocking surfaces** and both matter:
114114

115-
Two accepted patterns:
115+
1. **The OTLP HTTP exporter's retry loop.** `OTLPSpanExporter.export()` retries up to 6 times with exponential backoff (1s → 2s → 4s → 8s → 16s → 32s ≈ 63s total) on connect errors. Bounded only by the exporter's own `timeout` argument — OTel's standard `OTEL_BSP_EXPORT_TIMEOUT` is wired but [ignored by design](https://github.com/open-telemetry/opentelemetry-python) (the HTTP exporter has the explicit comment `# Not used. No way currently to pass timeout to export.`). Langfuse 4.x passes its own `LANGFUSE_TIMEOUT` env var (seconds, default 5) straight into the exporter constructor; that's the only knob that caps the retry deadline.
116+
2. **`resource_manager.flush()`'s queue joins.** `Langfuse().flush()` calls `self.tracer_provider.force_flush()` (OTel default 30 s, and the passed `export_timeout_millis` is silently ignored), then `_score_ingestion_queue.join()` and `_media_upload_queue.join()` **with no timeout at all**. If we emit any scores (we do — `_emit_quality_scores` in `api.py`) and the Langfuse endpoint is slow, the handler thread blocks forever on `.join()`.
116117

117-
1. **SDK-only config (chosen).** Module-scope `Langfuse()` client (constructed lazily on first use and cached for the warm container's lifetime) plus tight OTEL env vars — `OTEL_EXPORTER_OTLP_TIMEOUT=3000`, `OTEL_BSP_EXPORT_TIMEOUT=5000`, `LANGFUSE_FLUSH_AT=1`. Worst-case per-request latency cost: ~5s on cold start when Langfuse is unreachable. At our scale (1–2 rpm hobby traffic) this is the recommended pattern — Langfuse maintainers reiterate it in [discussion #7669](https://github.com/orgs/langfuse/discussions/7669) and it's what almost every real-world Lambda + Langfuse deployment uses.
118-
2. **ADOT Lambda Extension.** Runs a local OpenTelemetry Collector as an external extension process. The app exports to `localhost:4318`; the collector buffers and flushes during the extension lifecycle (after the handler returns but before the container freezes). User-facing request latency is untouched. Widely regarded as overkill below double-digit RPS, and adds Dockerfile complexity (the collector has to be baked into the container image — Layers don't work with container-image Lambdas).
118+
### What we tried first and why it failed
119119

120-
Pattern 2 is the production answer if (a) trace-drop rate becomes visible in the Langfuse UI, or (b) the ~5s worst-case cold-start tax shows up as user-visible latency. Pattern 1 remains the default. `LANGFUSE_TRACING_ENABLED=false` is preserved as a kill-switch — a single `aws lambda update-function-configuration` call disables tracing in seconds if a future Langfuse outage overruns the 5s cap.
120+
The first correction set `OTEL_EXPORTER_OTLP_TIMEOUT=3000` and `OTEL_BSP_EXPORT_TIMEOUT=5000` — both ignored by Langfuse's own processor, so `/team` hung 60s after redeploy. We also set `LANGFUSE_FLUSH_AT=1` thinking it would reduce queue depth; it actually *made things worse* because `BatchSpanProcessor.on_end()` triggers synchronous export on the handler thread whenever `queue_size >= max_export_batch_size`. With `max=1`, every span paid the full retry loop inline.
121+
122+
### What actually works (chosen)
123+
124+
- **`LANGFUSE_TIMEOUT=2`** — seconds, caps the OTLP exporter's retry loop (per-POST socket timeout AND overall deadline). A dead endpoint now costs ~2s, not 60s.
125+
- **`LANGFUSE_FLUSH_INTERVAL=1`** — short background flush interval so the batch processor drains on its own thread between requests. Small tail of events may be stranded when the Lambda freezes mid-batch; acceptable at our scale.
126+
- **Do NOT set `LANGFUSE_FLUSH_AT=1`** (see failure mode above — leave the SDK default of 15).
127+
- **Remove all explicit `langfuse_flush()` calls from request handlers.** The `_score_ingestion_queue.join()` path in `resource_manager.flush()` is unbounded and the background processor covers the common case anyway.
128+
- **Keep `LANGFUSE_TRACING_ENABLED` as the kill-switch** — a single `aws lambda update-function-configuration` flip disables tracing in seconds.
129+
130+
### Source citations
131+
132+
- `langfuse/_client/client.py:269``timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5))`
133+
- `langfuse/_client/span_processor.py:108-112``OTLPSpanExporter(timeout=timeout)`
134+
- `langfuse/_client/resource_manager.py:430``flush()` call chain; `_score_ingestion_queue.join()` without timeout
135+
- `opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py:174-224` — retry loop with `_MAX_RETRYS = 6`, deadline = `time() + self._timeout`
136+
137+
### ADOT stays deferred
138+
139+
[AWS Distro for OpenTelemetry](https://aws-otel.github.io/) as a Lambda Extension runs a local OTEL collector as an external extension process — the app exports to `localhost:4318` and the extension flushes during the post-invocation lifecycle (before freeze), so user latency is untouched. It's the production answer at higher RPS and sidesteps every SDK-level blocking surface above. Deferred for us because (a) the SDK-level fix above is what Langfuse maintainers recommend for low-traffic Lambda, (b) ADOT on container-image Lambdas (Layers don't work) requires Dockerfile plumbing, and (c) once `LANGFUSE_TIMEOUT=2` is in place, worst case is a ~2s tax which is invisible at 1–2 rpm. Revisit if the Langfuse UI shows visible trace drop or if cold-start latency becomes user-facing.
140+
141+
Langfuse PR [#1618](https://github.com/langfuse/langfuse-python/pull/1618) (v4.1+) adds a `span_exporter=` kwarg to `Langfuse()` letting apps inject a custom fire-and-forget or ADOT-routing exporter — cleaner escape hatch than overriding env vars. Worth revisiting after Langfuse 4.1 ships.

infrastructure/environments/dev/lambda.tf

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -228,25 +228,37 @@ module "lambda_agent" {
228228
SQUAD_CACHE_TABLE = aws_dynamodb_table.squad_cache.name
229229
TEAM_FETCHER_FUNCTION_NAME = module.lambda_team_fetcher.function_name
230230

231-
# Langfuse tracing is on with the SDK pinned to tight timeouts so a slow
232-
# or unreachable Langfuse endpoint adds at most ~5s to the response path
233-
# (vs. the 60s default that blew up /team in #133). Matches the pattern
234-
# Langfuse maintainers recommend for low-traffic Lambda deployments —
235-
# ADOT Lambda Extension is the production answer at higher RPS and stays
236-
# deferred pending observed trace-drop rate (see ADR-0005 revision).
237-
LANGFUSE_TRACING_ENABLED = "true"
238-
# OTLP exporter: give up on any single HTTP upload to cloud.langfuse.com
239-
# after 3s (default 10s). Below the 5s BSP cap so one retry can still fit.
240-
OTEL_EXPORTER_OTLP_TIMEOUT = "3000"
241-
# BatchSpanProcessor: hard ceiling on the entire flush including retries.
242-
# Lambda freezes the process the instant the handler returns, so any
243-
# flush() call on the response thread blocks up to this value.
244-
OTEL_BSP_EXPORT_TIMEOUT = "5000"
245-
# Emit each span immediately — we'd rather pay 1 flush per span at low
246-
# traffic than batch spans that could be stranded in the queue when the
247-
# container freezes. At 1-2 rpm the upload cost is negligible.
248-
LANGFUSE_FLUSH_AT = "1"
249-
LANGFUSE_FLUSH_INTERVAL = "1000"
231+
# Langfuse tracing — off by default; the kill-switch is preserved so a
232+
# single CLI flip can disable tracing in seconds if a future Langfuse
233+
# outage overruns the 2s timeout we pin below. We earned this caution:
234+
# #137's first attempt re-enabled tracing with OTEL_BSP_EXPORT_TIMEOUT +
235+
# OTEL_EXPORTER_OTLP_TIMEOUT, which are documented-ignored by
236+
# OpenTelemetry's OTLP HTTP exporter (`# Not used. No way currently to
237+
# pass timeout to export.` — opentelemetry/exporter/otlp/proto/http/
238+
# trace_exporter/__init__.py:164) and by Langfuse 4.x's own LangfuseSpan-
239+
# Processor. Tracing is flipped back on at the CLI after this applies —
240+
# once we've verified LANGFUSE_TIMEOUT actually bounds the retry loop in
241+
# production, the default here moves to "true".
242+
LANGFUSE_TRACING_ENABLED = "false"
243+
244+
# LANGFUSE_TIMEOUT is passed straight to OTLPSpanExporter(timeout=…). It
245+
# bounds both the per-POST socket timeout AND the retry loop's deadline
246+
# (the OTel exporter aborts further retries once backoff_seconds >
247+
# deadline_sec - time()). Default is 5 seconds; we pin 2 so a dead
248+
# endpoint can never cost more than ~2s on the response thread.
249+
# See langfuse/_client/client.py:269 + _client/span_processor.py:108.
250+
LANGFUSE_TIMEOUT = "2"
251+
252+
# LANGFUSE_FLUSH_INTERVAL controls the background batch-processor's
253+
# schedule delay (seconds). Setting it low keeps queue depth small so a
254+
# Lambda freeze between requests loses few spans. We do NOT set
255+
# LANGFUSE_FLUSH_AT=1: that puts max_export_batch_size=1 on the
256+
# BatchSpanProcessor, which makes on_end() call export() synchronously
257+
# on the handler thread every time the queue hits 1 item — which is
258+
# every span. That moves the blocking OTLP upload onto the user-facing
259+
# request path, the exact behaviour the background thread is meant to
260+
# prevent.
261+
LANGFUSE_FLUSH_INTERVAL = "1"
250262

251263
# Shared-secret gate: CloudFront injects this header on every origin
252264
# request (terraform-managed, stored in Secrets Manager). The FastAPI

services/agent/src/fpl_agent/api.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@
5151
observe,
5252
propagate_attributes,
5353
)
54-
from fpl_lib.observability import (
55-
flush as langfuse_flush,
56-
)
5754
from fpl_lib.secrets import resolve_secret_to_env
5855

5956
logger = logging.getLogger(__name__)
@@ -446,14 +443,19 @@ async def chat_sync(
446443
final_state = await _run_graph(graph, req.question, req.squad)
447444
except Exception as exc: # noqa: BLE001 — surface any agent failure as 500
448445
logger.exception("agent run failed")
449-
langfuse_flush()
450446
raise HTTPException(status_code=500, detail=f"agent_failure: {exc}") from exc
451447

452448
_emit_quality_scores(final_state)
453449
await budget.record_batch(final_state.get("llm_usage", []))
454450
response = _agent_response(final_state).model_dump(mode="json")
455451

456-
langfuse_flush()
452+
# NOTE: no explicit langfuse_flush() here. Langfuse 4.x's
453+
# resource_manager.flush() calls _score_ingestion_queue.join() with no
454+
# timeout, so a slow or unreachable Langfuse endpoint blocks the response
455+
# thread indefinitely. The SDK's background batch processor (flush
456+
# interval 1s) drains spans/scores on its own thread between invocations;
457+
# a small tail of events may be stranded in the queue when the Lambda
458+
# container freezes, which is the accepted trade-off at our scale.
457459
return JSONResponse(response)
458460

459461

@@ -479,7 +481,14 @@ async def chat_stream(
479481
not around ``EventSourceResponse(...)`` — because the generator runs
480482
async after the handler returns, and contextvars don't propagate
481483
across the boundary unless they're entered in the generator frame.
482-
Same reason ``langfuse_flush`` is in the generator's ``finally``.
484+
485+
Note: we do not explicitly flush Langfuse here. ``flush()`` on Langfuse
486+
4.x unconditionally blocks on ``_score_ingestion_queue.join()`` with no
487+
timeout, which can hang the stream indefinitely when Langfuse Cloud is
488+
slow. The background batch processor (``LANGFUSE_FLUSH_INTERVAL=1``)
489+
drains spans and scores between requests on its own thread — at our
490+
scale (1–2 rpm) the worst case is a small tail of events stranded when
491+
the container freezes, which is acceptable.
483492
"""
484493

485494
async def event_generator() -> AsyncIterator[dict[str, str]]:
@@ -505,7 +514,6 @@ async def event_generator() -> AsyncIterator[dict[str, str]]:
505514
except Exception as exc: # noqa: BLE001
506515
logger.exception("agent stream failed")
507516
yield _sse("error", {"message": str(exc)})
508-
langfuse_flush()
509517
return
510518

511519
_emit_quality_scores(final_state)
@@ -516,11 +524,6 @@ async def event_generator() -> AsyncIterator[dict[str, str]]:
516524

517525
yield _sse("result", _agent_response(final_state).model_dump(mode="json"))
518526

519-
# Outside propagate_attributes — flush must happen after the last
520-
# event is yielded so client disconnect between steps still uploads
521-
# accumulated spans.
522-
langfuse_flush()
523-
524527
return EventSourceResponse(event_generator())
525528

526529

0 commit comments

Comments
 (0)