Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class Langfuse:
base_url (Optional[str]): The Langfuse API base URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_BASE_URL environment variable.
host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com".
timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds.
httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
httpx_client (Optional[httpx.Client]): Custom synchronous httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
async_httpx_client (Optional[httpx.AsyncClient]): Custom asynchronous httpx client for `client.async_api`. If not provided, a default async client will be created.
debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable.
tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable.
flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable.
Expand All @@ -179,7 +180,7 @@ class Langfuse:
)
```
should_export_span (Optional[Callable[[ReadableSpan], bool]]): Callback to decide whether to export a span. If omitted, Langfuse uses the default filter (Langfuse SDK spans, spans with `gen_ai.*` attributes, and known LLM instrumentation scopes).
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well.
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If `httpx_client` or `async_httpx_client` is provided, `additional_headers` must be set directly on your custom client as well.
tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees.

Example:
Expand Down Expand Up @@ -231,6 +232,7 @@ def __init__(
host: Optional[str] = None,
timeout: Optional[int] = None,
httpx_client: Optional[httpx.Client] = None,
async_httpx_client: Optional[httpx.AsyncClient] = None,
debug: bool = False,
tracing_enabled: Optional[bool] = True,
flush_at: Optional[int] = None,
Expand Down Expand Up @@ -332,6 +334,7 @@ def __init__(
flush_at=flush_at,
flush_interval=flush_interval,
httpx_client=httpx_client,
async_httpx_client=async_httpx_client,
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
Expand Down
1 change: 1 addition & 0 deletions langfuse/_client/get_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _create_client_from_instance(
additional_headers=instance.additional_headers,
tracer_provider=instance.tracer_provider,
httpx_client=instance.httpx_client,
async_httpx_client=instance.async_httpx_client,
)


Expand Down
13 changes: 13 additions & 0 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __new__(
flush_at: Optional[int] = None,
flush_interval: Optional[float] = None,
httpx_client: Optional[httpx.Client] = None,
async_httpx_client: Optional[httpx.AsyncClient] = None,
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
Expand Down Expand Up @@ -123,6 +124,7 @@ def __new__(
flush_at=flush_at,
flush_interval=flush_interval,
httpx_client=httpx_client,
async_httpx_client=async_httpx_client,
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
Expand Down Expand Up @@ -152,6 +154,7 @@ def _initialize_instance(
flush_interval: Optional[float] = None,
media_upload_thread_count: Optional[int] = None,
httpx_client: Optional[httpx.Client] = None,
async_httpx_client: Optional[httpx.AsyncClient] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
tracing_enabled: bool = True,
Expand Down Expand Up @@ -218,6 +221,15 @@ def _initialize_instance(
client_headers = additional_headers if additional_headers else {}
self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers)

if async_httpx_client is not None:
self.async_httpx_client = async_httpx_client
else:
async_client_headers = additional_headers if additional_headers else {}
self.async_httpx_client = httpx.AsyncClient(
timeout=timeout,
headers=async_client_headers,
)
Comment thread
hassiebp marked this conversation as resolved.

self.api = LangfuseAPI(
base_url=base_url,
username=self.public_key,
Expand All @@ -235,6 +247,7 @@ def _initialize_instance(
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=langfuse_version,
x_langfuse_public_key=self.public_key,
httpx_client=self.async_httpx_client,
timeout=timeout,
Comment on lines +250 to 251
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve custom async client timeouts

When a caller provides async_httpx_client, this still forwards timeout to AsyncLangfuseAPI, and AsyncLangfuseAPI prefers that explicit timeout over the client’s own timeout config; since Langfuse.__init__ resolves timeout to a concrete value by default, requests from client.async_api are forced to that value instead of using the custom httpx.AsyncClient settings. This makes custom async clients partially ineffective and can cause unexpected request timeouts (for example, long-running endpoints being cut off at the SDK default).

Useful? React with 👍 / 👎.

)
score_ingestion_client = LangfuseClient(
Expand Down
24 changes: 24 additions & 0 deletions tests/test_additional_headers_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
This module tests that additional headers are properly configured in the HTTP clients.
"""

import asyncio

import httpx

from langfuse._client.client import Langfuse
Expand Down Expand Up @@ -115,6 +117,28 @@ def test_media_manager_uses_custom_httpx_client(self):
assert langfuse._resources is not None
assert langfuse._resources._media_manager._httpx_client is custom_client

def test_async_api_uses_custom_async_httpx_client(self):
"""Test that async_api reuses the configured custom async httpx client."""
custom_async_client = httpx.AsyncClient()

try:
langfuse = Langfuse(
public_key="test-public-key",
secret_key="test-secret-key",
host="https://mock-host.com",
async_httpx_client=custom_async_client,
tracing_enabled=False,
)

assert langfuse._resources is not None
assert langfuse._resources.async_httpx_client is custom_async_client
assert (
langfuse.async_api._client_wrapper.httpx_client.httpx_client
is custom_async_client
)
finally:
asyncio.run(custom_async_client.aclose())

def test_none_additional_headers_works(self):
"""Test that passing None for additional_headers works without errors."""
langfuse = Langfuse(
Expand Down
31 changes: 31 additions & 0 deletions tests/test_resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Test the LangfuseResourceManager and get_client() function."""

import asyncio

import httpx

from langfuse import Langfuse
from langfuse._client.get_client import get_client
from langfuse._client.resource_manager import LangfuseResourceManager
Expand Down Expand Up @@ -94,3 +98,30 @@ def should_export_b(span):

client_a.shutdown()
client_b.shutdown()


def test_get_client_preserves_custom_async_httpx_client():
"""Test that get_client() preserves the custom async httpx client."""
with LangfuseResourceManager._lock:
LangfuseResourceManager._instances.clear()

custom_async_client = httpx.AsyncClient()

try:
Langfuse(
public_key="pk-async-client",
secret_key="sk-async-client",
async_httpx_client=custom_async_client,
tracing_enabled=False,
)
retrieved_client = get_client()

assert retrieved_client._resources is not None
assert retrieved_client._resources.async_httpx_client is custom_async_client
assert (
retrieved_client.async_api._client_wrapper.httpx_client.httpx_client
is custom_async_client
)
finally:
LangfuseResourceManager.reset()
asyncio.run(custom_async_client.aclose())
Loading