Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class Langfuse:
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (`metadata.scope.name`)
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers.

Example:
```python
Expand Down Expand Up @@ -163,6 +164,7 @@ def __init__(
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
additional_headers: Optional[Dict[str, str]] = None,
):
self._host = host or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
self._environment = environment or os.environ.get(LANGFUSE_TRACING_ENVIRONMENT)
Expand Down Expand Up @@ -225,6 +227,7 @@ def __init__(
mask=mask,
tracing_enabled=self._tracing_enabled,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
additional_headers=additional_headers,
)
self._mask = self._resources.mask

Expand Down
16 changes: 15 additions & 1 deletion langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __new__(
mask: Optional[MaskFunction] = None,
tracing_enabled: Optional[bool] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
additional_headers: Optional[Dict[str, str]] = None,
) -> "LangfuseResourceManager":
if public_key in cls._instances:
return cls._instances[public_key]
Expand All @@ -119,6 +120,7 @@ def __new__(
if tracing_enabled is not None
else True,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
additional_headers=additional_headers,
)

cls._instances[public_key] = instance
Expand All @@ -142,6 +144,7 @@ def _initialize_instance(
mask: Optional[MaskFunction] = None,
tracing_enabled: bool = True,
blocked_instrumentation_scopes: Optional[List[str]] = None,
additional_headers: Optional[Dict[str, str]] = None,
):
self.public_key = public_key
self.secret_key = secret_key
Expand All @@ -163,6 +166,7 @@ def _initialize_instance(
flush_at=flush_at,
flush_interval=flush_interval,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
additional_headers=additional_headers,
)
tracer_provider.add_span_processor(langfuse_processor)

Expand All @@ -179,7 +183,17 @@ def _initialize_instance(
## use connection pools with limited capacity. Creating multiple instances
## could exhaust the OS's maximum number of available TCP sockets (file descriptors),
## leading to connection errors.
self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
if httpx_client is not None:
self.httpx_client = httpx_client
# If additional_headers are provided and httpx_client is provided,
# we merge the headers into the existing client
if additional_headers:
merged_headers = {**(httpx_client.headers or {}), **additional_headers}
self.httpx_client.headers = merged_headers
else:
# Create a new httpx client with additional_headers if provided
client_headers = additional_headers if additional_headers else {}
self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers)
self.api = FernLangfuse(
base_url=host,
username=self.public_key,
Expand Down
21 changes: 14 additions & 7 deletions langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import base64
import os
from typing import List, Optional
from typing import Dict, List, Optional

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan
Expand Down Expand Up @@ -56,6 +56,7 @@ def __init__(
flush_at: Optional[int] = None,
flush_interval: Optional[float] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
additional_headers: Optional[Dict[str, str]] = None,
):
self.public_key = public_key
self.blocked_instrumentation_scopes = (
Expand All @@ -78,14 +79,20 @@ def __init__(
f"{public_key}:{secret_key}".encode("utf-8")
).decode("ascii")

# Prepare default headers
default_headers = {
"Authorization": basic_auth_header,
"x_langfuse_sdk_name": "python",
"x_langfuse_sdk_version": langfuse_version,
"x_langfuse_public_key": public_key,
}

# Merge additional headers if provided
headers = {**default_headers, **(additional_headers or {})}

langfuse_span_exporter = OTLPSpanExporter(
endpoint=f"{host}/api/public/otel/v1/traces",
headers={
"Authorization": basic_auth_header,
"x_langfuse_sdk_name": "python",
"x_langfuse_sdk_version": langfuse_version,
"x_langfuse_public_key": public_key,
},
headers=headers,
timeout=timeout,
)

Expand Down