-
Notifications
You must be signed in to change notification settings - Fork 265
Expand file tree
/
Copy pathspan_processor.py
More file actions
170 lines (140 loc) · 6.66 KB
/
span_processor.py
File metadata and controls
170 lines (140 loc) · 6.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""Span processor for Langfuse OpenTelemetry integration.
This module defines the LangfuseSpanProcessor class, which extends OpenTelemetry's
BatchSpanProcessor with Langfuse-specific functionality. It handles exporting
spans to the Langfuse API with proper authentication and filtering.
Key features:
- HTTP-based span export to Langfuse API
- Basic authentication with Langfuse API keys
- Configurable batch processing behavior
- Project-scoped span filtering to prevent cross-project data leakage
"""
import base64
import os
from typing import Dict, List, Optional
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.processor.baggage import BaggageSpanProcessor, ALLOW_ALL_BAGGAGE_KEYS
from langfuse._client.constants import LANGFUSE_TRACER_NAME
from langfuse._client.environment_variables import (
LANGFUSE_FLUSH_AT,
LANGFUSE_FLUSH_INTERVAL,
)
from langfuse._client.utils import span_formatter
from langfuse.logger import langfuse_logger
from langfuse.version import __version__ as langfuse_version
class LangfuseSpanProcessor(BatchSpanProcessor):
"""OpenTelemetry span processor that exports spans to the Langfuse API.
This processor extends OpenTelemetry's BatchSpanProcessor with Langfuse-specific functionality:
1. Project-scoped span filtering to prevent cross-project data leakage
2. Instrumentation scope filtering to block spans from specific libraries/frameworks
3. Configurable batch processing parameters for optimal performance
4. HTTP-based span export to the Langfuse OTLP endpoint
5. Debug logging for span processing operations
6. Authentication with Langfuse API using Basic Auth
The processor is designed to efficiently handle large volumes of spans with
minimal overhead, while ensuring spans are only sent to the correct project.
It integrates with OpenTelemetry's standard span lifecycle, adding Langfuse-specific
filtering and export capabilities.
"""
def __init__(
self,
*,
public_key: str,
secret_key: str,
host: str,
timeout: Optional[int] = None,
flush_at: Optional[int] = None,
flush_interval: Optional[float] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
additional_headers: Optional[Dict[str, str]] = None,
):
self.public_key = public_key
self.blocked_instrumentation_scopes = (
blocked_instrumentation_scopes
if blocked_instrumentation_scopes is not None
else []
)
# Initialize a BaggageSpanProcessor so baggage keys are attached to spans.
# Allow all baggage keys by default (same behaviour as before).
self._baggage_processor = BaggageSpanProcessor(ALLOW_ALL_BAGGAGE_KEYS)
env_flush_at = os.environ.get(LANGFUSE_FLUSH_AT, None)
flush_at = flush_at or int(env_flush_at) if env_flush_at is not None else None
env_flush_interval = os.environ.get(LANGFUSE_FLUSH_INTERVAL, None)
flush_interval = (
flush_interval or float(env_flush_interval)
if env_flush_interval is not None
else None
)
basic_auth_header = "Basic " + base64.b64encode(
f"{public_key}:{secret_key}".encode("utf-8")
).decode("ascii")
# Prepare default headers
default_headers = {
"Authorization": basic_auth_header,
"x_langfuse_sdk_name": "python",
"x_langfuse_sdk_version": langfuse_version,
"x_langfuse_public_key": public_key,
}
# Merge additional headers if provided
headers = {**default_headers, **(additional_headers or {})}
langfuse_span_exporter = OTLPSpanExporter(
endpoint=f"{host}/api/public/otel/v1/traces",
headers=headers,
timeout=timeout,
)
super().__init__(
span_exporter=langfuse_span_exporter,
export_timeout_millis=timeout * 1_000 if timeout else None,
max_export_batch_size=flush_at,
schedule_delay_millis=flush_interval * 1_000
if flush_interval is not None
else None,
)
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
# Forward to baggage processor first so baggage can be attached early.
self._baggage_processor.on_start(span, parent_context)
# Call parent on_start if it exists (no-op for BatchSpanProcessor but safe).
try:
super().on_start(span, parent_context)
except TypeError:
super().on_start(span)
def on_end(self, span: ReadableSpan) -> None:
# Only export spans that belong to the scoped project
# This is important to not send spans to wrong project in multi-project setups
if self._is_langfuse_span(span) and not self._is_langfuse_project_span(span):
langfuse_logger.debug(
f"Security: Span rejected - belongs to project '{span.instrumentation_scope.attributes.get('public_key') if span.instrumentation_scope and span.instrumentation_scope.attributes else None}' but processor is for '{self.public_key}'. "
f"This prevents cross-project data leakage in multi-project environments."
)
return
# Do not export spans from blocked instrumentation scopes
if self._is_blocked_instrumentation_scope(span):
return
langfuse_logger.debug(
f"Trace: Processing span name='{span._name}' | Full details:\n{span_formatter(span)}"
)
super().on_end(span)
@staticmethod
def _is_langfuse_span(span: ReadableSpan) -> bool:
return (
span.instrumentation_scope is not None
and span.instrumentation_scope.name == LANGFUSE_TRACER_NAME
)
def _is_blocked_instrumentation_scope(self, span: ReadableSpan) -> bool:
return (
span.instrumentation_scope is not None
and span.instrumentation_scope.name in self.blocked_instrumentation_scopes
)
def _is_langfuse_project_span(self, span: ReadableSpan) -> bool:
if not LangfuseSpanProcessor._is_langfuse_span(span):
return False
if span.instrumentation_scope is not None:
public_key_on_span = (
span.instrumentation_scope.attributes.get("public_key", None)
if span.instrumentation_scope.attributes
else None
)
return public_key_on_span == self.public_key
return False