Skip to content

Commit d0e23a3

Browse files
committed
feat(temporal): add Temporal plugin for observability
Introduces langfuse.temporal.LangfusePlugin, a Temporal SimplePlugin that enriches the OTel spans produced by temporalio.contrib.opentelemetry with Langfuse session/user/tag/metadata attributes and optional payload captures. The plugin defaults to metadata-only capture, stays sandbox/replay safe, and composes with framework plugins (OpenAIAgentsPlugin, PydanticAIPlugin) rather than replacing them. https://claude.ai/code/session_013bfLpjTrf8qPkTftZHb9Hf
1 parent 3a65ce8 commit d0e23a3

13 files changed

Lines changed: 1530 additions & 1 deletion

File tree

langfuse/temporal/__init__.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Langfuse integration for Temporal (``temporalio``).
2+
3+
This package provides a Temporal plugin that emits OpenTelemetry spans for
4+
Temporal client/workflow/activity operations and routes them to Langfuse
5+
with Langfuse-specific attributes (session id, user id, tags, metadata,
6+
and optional input/output captures).
7+
8+
Public API::
9+
10+
from langfuse.temporal import LangfusePlugin, LangfusePluginConfig
11+
12+
Usage (minimal)::
13+
14+
from temporalio.client import Client
15+
from langfuse.temporal import LangfusePlugin
16+
17+
client = await Client.connect(
18+
"localhost:7233",
19+
plugins=[LangfusePlugin()],
20+
)
21+
22+
The plugin is worker-capable, so when attached to the ``Client`` it is
23+
automatically carried over to any ``Worker`` constructed from that client.
24+
Do not re-attach it on the worker — see the guide for details.
25+
26+
Framework presets::
27+
28+
from langfuse.temporal.presets import (
29+
langfuse_openai_agents_plugins,
30+
langfuse_pydantic_ai_plugins,
31+
)
32+
33+
Installation::
34+
35+
pip install "langfuse[temporal]"
36+
37+
See the Langfuse docs for the full configuration surface, sandbox /
38+
replay caveats, and framework-specific guides.
39+
"""
40+
41+
from __future__ import annotations
42+
43+
from .attributes import (
44+
LANGFUSE_INPUT,
45+
LANGFUSE_METADATA_PREFIX,
46+
LANGFUSE_OUTPUT,
47+
LANGFUSE_SESSION_ID,
48+
LANGFUSE_TAGS,
49+
LANGFUSE_USER_ID,
50+
)
51+
from .config import (
52+
CaptureConfig,
53+
FactoryContext,
54+
LangfusePluginConfig,
55+
TracingConfig,
56+
UIEnrichmentConfig,
57+
)
58+
from .plugin import PLUGIN_NAME, LangfusePlugin
59+
60+
__all__ = [
61+
"LangfusePlugin",
62+
"LangfusePluginConfig",
63+
"CaptureConfig",
64+
"TracingConfig",
65+
"UIEnrichmentConfig",
66+
"FactoryContext",
67+
"PLUGIN_NAME",
68+
"LANGFUSE_SESSION_ID",
69+
"LANGFUSE_USER_ID",
70+
"LANGFUSE_TAGS",
71+
"LANGFUSE_INPUT",
72+
"LANGFUSE_OUTPUT",
73+
"LANGFUSE_METADATA_PREFIX",
74+
]

langfuse/temporal/attributes.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""OpenTelemetry span attribute keys used by the Langfuse Temporal plugin.
2+
3+
These are the canonical attribute keys that the plugin writes onto the
4+
OpenTelemetry spans produced by ``temporalio.contrib.opentelemetry``. They are
5+
consumed by Langfuse's OTel ingestion path, which maps selected attributes
6+
onto Langfuse trace / observation fields (``session_id``, ``user_id``,
7+
``tags``, ``metadata``, ``input``, ``output``, etc.).
8+
9+
Keeping them centralized prevents typos and makes it easy to audit exactly
10+
which attributes the plugin can emit.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
# Langfuse core correlation attributes. These mirror the attribute keys used
16+
# by the Langfuse OTel SDK and must stay compatible with Langfuse ingestion.
17+
LANGFUSE_SESSION_ID = "langfuse.session.id"
18+
LANGFUSE_USER_ID = "langfuse.user.id"
19+
LANGFUSE_TAGS = "langfuse.tags"
20+
LANGFUSE_METADATA_PREFIX = "langfuse.metadata."
21+
LANGFUSE_ENVIRONMENT = "langfuse.environment"
22+
LANGFUSE_RELEASE = "langfuse.release"
23+
LANGFUSE_VERSION = "langfuse.version"
24+
25+
# Optional payload capture. These are only written when the plugin is
26+
# explicitly configured to capture payloads.
27+
LANGFUSE_INPUT = "langfuse.observation.input"
28+
LANGFUSE_OUTPUT = "langfuse.observation.output"
29+
30+
# Temporal-specific metadata. These are ingested by Langfuse as generic span
31+
# attributes (and surfaced as observation metadata) and are always safe to
32+
# emit because they are identifiers, not payload bodies.
33+
TEMPORAL_WORKFLOW_ID = "temporal.workflow.id"
34+
TEMPORAL_RUN_ID = "temporal.workflow.run_id"
35+
TEMPORAL_WORKFLOW_TYPE = "temporal.workflow.type"
36+
TEMPORAL_NAMESPACE = "temporal.namespace"
37+
TEMPORAL_TASK_QUEUE = "temporal.task_queue"
38+
TEMPORAL_ACTIVITY_ID = "temporal.activity.id"
39+
TEMPORAL_ACTIVITY_TYPE = "temporal.activity.type"
40+
TEMPORAL_ATTEMPT = "temporal.attempt"
41+
TEMPORAL_PARENT_WORKFLOW_ID = "temporal.parent.workflow_id"
42+
TEMPORAL_PARENT_RUN_ID = "temporal.parent.run_id"
43+
TEMPORAL_IS_LOCAL_ACTIVITY = "temporal.activity.is_local"
44+
TEMPORAL_IS_REPLAYING = "temporal.workflow.is_replaying"
45+
46+
47+
__all__ = [
48+
"LANGFUSE_SESSION_ID",
49+
"LANGFUSE_USER_ID",
50+
"LANGFUSE_TAGS",
51+
"LANGFUSE_METADATA_PREFIX",
52+
"LANGFUSE_ENVIRONMENT",
53+
"LANGFUSE_RELEASE",
54+
"LANGFUSE_VERSION",
55+
"LANGFUSE_INPUT",
56+
"LANGFUSE_OUTPUT",
57+
"TEMPORAL_WORKFLOW_ID",
58+
"TEMPORAL_RUN_ID",
59+
"TEMPORAL_WORKFLOW_TYPE",
60+
"TEMPORAL_NAMESPACE",
61+
"TEMPORAL_TASK_QUEUE",
62+
"TEMPORAL_ACTIVITY_ID",
63+
"TEMPORAL_ACTIVITY_TYPE",
64+
"TEMPORAL_ATTEMPT",
65+
"TEMPORAL_PARENT_WORKFLOW_ID",
66+
"TEMPORAL_PARENT_RUN_ID",
67+
"TEMPORAL_IS_LOCAL_ACTIVITY",
68+
"TEMPORAL_IS_REPLAYING",
69+
]

langfuse/temporal/config.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
"""Configuration surface for the Langfuse Temporal plugin.
2+
3+
Organized into logical groups so the main plugin constructor does not have
4+
to explode into a giant positional API. Each knob has a conservative default
5+
so that installing the plugin with no arguments gives you safe, metadata-only
6+
Temporal tracing routed to Langfuse.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from dataclasses import dataclass, field
12+
from typing import Any, Callable, List, Mapping, Optional, Sequence
13+
14+
from .redaction import RedactCallback
15+
16+
# Factory signatures. They are called at span-creation time inside the
17+
# Langfuse enrichment interceptor and must be side-effect free: they run
18+
# inside the Temporal workflow sandbox for workflow spans and must not touch
19+
# the network, wall-clock time, or mutable globals.
20+
IdFactory = Callable[["FactoryContext"], Optional[str]]
21+
TagFactory = Callable[["FactoryContext"], Optional[Sequence[str]]]
22+
MetadataFactory = Callable[["FactoryContext"], Optional[Mapping[str, Any]]]
23+
24+
25+
@dataclass
26+
class FactoryContext:
27+
"""Deterministic context passed to user-provided factory callbacks.
28+
29+
``kind`` is one of ``"client_start_workflow"``, ``"workflow"``,
30+
``"activity"``. ``info`` carries whatever Temporal info object is
31+
available on the current side (the client input, ``workflow.Info``, or
32+
``activity.Info``). ``input`` is the raw Temporal args tuple; it is made
33+
available so factories can, e.g., pull a user_id off a request object,
34+
but factories must not mutate it.
35+
"""
36+
37+
kind: str
38+
info: Any = None
39+
input: Any = None
40+
41+
42+
@dataclass
43+
class CaptureConfig:
44+
"""Controls for payload capture on Temporal spans.
45+
46+
All flags default to ``False`` so that installing the plugin never
47+
accidentally exports sensitive workflow/activity payloads to Langfuse.
48+
When a flag is enabled, the corresponding payload is serialized, passed
49+
through :attr:`redact`, truncated to :attr:`size_limit_bytes`, and
50+
attached to the relevant span.
51+
"""
52+
53+
capture_workflow_inputs: bool = False
54+
capture_workflow_outputs: bool = False
55+
capture_activity_inputs: bool = False
56+
capture_activity_outputs: bool = False
57+
58+
size_limit_bytes: Optional[int] = 32 * 1024
59+
redact: Optional[RedactCallback] = None
60+
61+
# Allow/deny lists by Temporal name. A workflow or activity name appears
62+
# on the denylist wins: it is never captured. When an allowlist is
63+
# non-empty, only names on the allowlist are captured.
64+
workflow_allowlist: Optional[Sequence[str]] = None
65+
workflow_denylist: Optional[Sequence[str]] = None
66+
activity_allowlist: Optional[Sequence[str]] = None
67+
activity_denylist: Optional[Sequence[str]] = None
68+
69+
def should_capture_workflow(self, workflow_type: Optional[str]) -> bool:
70+
return _should_capture(
71+
workflow_type, self.workflow_allowlist, self.workflow_denylist
72+
)
73+
74+
def should_capture_activity(self, activity_type: Optional[str]) -> bool:
75+
return _should_capture(
76+
activity_type, self.activity_allowlist, self.activity_denylist
77+
)
78+
79+
80+
def _should_capture(
81+
name: Optional[str],
82+
allowlist: Optional[Sequence[str]],
83+
denylist: Optional[Sequence[str]],
84+
) -> bool:
85+
if name is None:
86+
# With no name to match against, fall back to the user's allowlist
87+
# intent: if they specified an allowlist we default-deny, otherwise
88+
# default-allow.
89+
return not allowlist
90+
if denylist and name in denylist:
91+
return False
92+
if allowlist:
93+
return name in allowlist
94+
return True
95+
96+
97+
@dataclass
98+
class TracingConfig:
99+
"""Controls for which Temporal surfaces produce spans.
100+
101+
The base plugin always instruments client ``start_workflow`` and worker
102+
``execute_workflow`` / ``execute_activity``. The remaining surfaces
103+
(signals, queries, updates, local activities) are opt-in because they
104+
can be extremely high-volume in production and often duplicate info
105+
already present on the parent workflow run.
106+
"""
107+
108+
add_temporal_spans: bool = True
109+
trace_signals: bool = False
110+
trace_queries: bool = False
111+
trace_updates: bool = True
112+
trace_local_activities: bool = True
113+
114+
115+
@dataclass
116+
class UIEnrichmentConfig:
117+
"""Controls for correlating Temporal UI fields with Langfuse.
118+
119+
``memo_trace_id`` asks the plugin to add the Langfuse ``trace_id`` to
120+
the Temporal workflow memo so operators can jump from Temporal UI to
121+
the Langfuse trace. ``search_attribute_key``, when set, does the same
122+
thing via a custom search attribute (which must already be registered
123+
in the target namespace).
124+
"""
125+
126+
memo_trace_id: bool = False
127+
search_attribute_key: Optional[str] = None
128+
129+
130+
@dataclass
131+
class LangfusePluginConfig:
132+
"""Full configuration for :class:`langfuse.temporal.LangfusePlugin`.
133+
134+
Using a dataclass keeps the plugin constructor readable and lets
135+
framework presets build a config once and reuse it.
136+
"""
137+
138+
# Tracing ownership.
139+
tracer_provider: Optional[Any] = None
140+
use_existing_otel: bool = True
141+
142+
# Langfuse client. When omitted, the plugin uses :func:`langfuse.get_client`
143+
# at worker startup. Workflow code never touches this object — it is
144+
# used only for flushing at worker shutdown.
145+
langfuse_client: Optional[Any] = None
146+
flush_on_shutdown: bool = True
147+
148+
# Tracing scope.
149+
tracing: TracingConfig = field(default_factory=TracingConfig)
150+
151+
# Privacy.
152+
capture: CaptureConfig = field(default_factory=CaptureConfig)
153+
154+
# Correlation.
155+
session_id_factory: Optional[IdFactory] = None
156+
user_id_factory: Optional[IdFactory] = None
157+
tags_factory: Optional[TagFactory] = None
158+
metadata_factory: Optional[MetadataFactory] = None
159+
160+
# Static defaults applied to every span (cheap, always safe).
161+
static_tags: Sequence[str] = field(default_factory=list)
162+
static_metadata: Mapping[str, Any] = field(default_factory=dict)
163+
environment: Optional[str] = None
164+
release: Optional[str] = None
165+
version: Optional[str] = None
166+
167+
# UI enrichment.
168+
ui: UIEnrichmentConfig = field(default_factory=UIEnrichmentConfig)
169+
170+
# Deployment mode. When ``True`` the plugin installs tracing/context
171+
# propagation but does not require/initialize a Langfuse exporter,
172+
# which is the right shape for starter-only processes.
173+
context_only: bool = False
174+
175+
def resolved_static_tags(self) -> List[str]:
176+
return list(self.static_tags)
177+
178+
179+
__all__ = [
180+
"CaptureConfig",
181+
"FactoryContext",
182+
"IdFactory",
183+
"LangfusePluginConfig",
184+
"MetadataFactory",
185+
"TagFactory",
186+
"TracingConfig",
187+
"UIEnrichmentConfig",
188+
]

0 commit comments

Comments
 (0)