Skip to content

Commit 74c5693

Browse files
committed
push
1 parent 3053e2d commit 74c5693

3 files changed

Lines changed: 41 additions & 15 deletions

File tree

langfuse/otel/__init__.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import httpx
88
from opentelemetry import trace
99
from opentelemetry import trace as otel_trace_api
10+
from opentelemetry.sdk.trace import TracerProvider
1011
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
1112

1213
from langfuse.api.client import AsyncFernLangfuse, FernLangfuse
@@ -43,15 +44,15 @@ def __init__(
4344
httpx_client: Optional[httpx.Client] = None,
4445
debug: bool = False,
4546
tracing_enabled: Optional[bool] = True,
46-
# threads: Optional[int] = None,
47-
# flush_at: Optional[int] = None,
48-
# flush_interval: Optional[float] = None,
49-
# max_retries: Optional[int] = None,
50-
# sample_rate: Optional[float] = None,
51-
# mask: Optional[MaskFunction] = None,
52-
# sdk_integration: Optional[str] = "default",
47+
flush_at: Optional[int] = None,
48+
flush_interval: Optional[float] = None,
5349
environment: Optional[str] = None,
5450
release: Optional[str] = None,
51+
# sample_rate: Optional[float] = None, # TODO: Implement sampling
52+
# mask: Optional[MaskFunction] = None, # TODO: implement masking
53+
# sdk_integration: Optional[str] = "default", -> TO BE DEPRECATED
54+
# threads: Optional[int] = None, -> TO BE DEPRECATED
55+
# max_retries: Optional[int] = None, -> TO BE DEPRECATED
5556
):
5657
debug = debug if debug else (os.getenv(LANGFUSE_DEBUG, "False") == "True")
5758

@@ -115,6 +116,8 @@ def __init__(
115116
timeout=timeout,
116117
environment=environment,
117118
release=release,
119+
flush_at=flush_at,
120+
flush_interval=flush_interval,
118121
).tracer
119122
if self.tracing_enabled
120123
else otel_trace_api.NoOpTracer()
@@ -271,7 +274,7 @@ def _create_span_with_parent_context(
271274
) as span:
272275
yield span
273276

274-
@staticmethod
277+
@staticmethod # TODO: reconsider marking methods as static as changing object method later is breaking change
275278
def get_current_span():
276279
return otel_trace_api.get_current_span()
277280

@@ -496,8 +499,16 @@ def update_finished_span():
496499

497500
@staticmethod
498501
def flush():
499-
pass
502+
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
503+
if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
504+
return
505+
506+
tracer_provider.force_flush()
500507

501508
@staticmethod
502509
def shutdown():
503-
pass
510+
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
511+
if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
512+
return
513+
514+
tracer_provider.force_flush()

langfuse/otel/_span_processor/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ def __init__(
1919
secret_key: str,
2020
host: str,
2121
timeout: Optional[int] = None,
22+
flush_at: Optional[int] = None,
23+
flush_interval: Optional[float] = None,
2224
):
2325
self.public_key = public_key
2426

@@ -36,7 +38,12 @@ def __init__(
3638
timeout=timeout,
3739
)
3840

39-
super().__init__(span_exporter=langfuse_span_exporter)
41+
super().__init__(
42+
span_exporter=langfuse_span_exporter,
43+
export_timeout_millis=timeout * 1_000 if timeout else None,
44+
max_export_batch_size=flush_at,
45+
schedule_delay_millis=flush_interval,
46+
)
4047

4148
def on_end(self, span: ReadableSpan) -> None:
4249
# Only export spans that belong to the scoped project

langfuse/otel/tracer.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
class LangfuseTracer:
2222
"""Singleton that provides access to the OTEL tracer."""
2323

24-
_instances: Dict[str, Optional["LangfuseTracer"]] = {}
24+
_instances: Dict[str, "LangfuseTracer"] = {}
2525
_lock = threading.Lock()
2626

2727
def __new__(
@@ -30,10 +30,12 @@ def __new__(
3030
public_key: str,
3131
secret_key: str,
3232
host: str,
33-
timeout: Optional[int] = None,
3433
environment: Optional[str] = None,
3534
release: Optional[str] = None,
36-
):
35+
timeout: Optional[int] = None,
36+
flush_at: Optional[int] = None,
37+
flush_interval: Optional[float] = None,
38+
) -> "LangfuseTracer":
3739
if public_key in cls._instances:
3840
return cls._instances[public_key]
3941

@@ -48,6 +50,8 @@ def __new__(
4850
timeout=timeout,
4951
environment=environment,
5052
release=release,
53+
flush_at=flush_at,
54+
flush_interval=flush_interval,
5155
)
5256

5357
cls._instances[public_key] = instance
@@ -60,9 +64,11 @@ def _initialize_instance(
6064
public_key: str,
6165
secret_key: str,
6266
host: str,
63-
timeout: Optional[int] = None,
6467
environment: Optional[str] = None,
6568
release: Optional[str] = None,
69+
timeout: Optional[int] = None,
70+
flush_at: Optional[int] = None,
71+
flush_interval: Optional[float] = None,
6672
):
6773
tracer_provider = _init_tracer_provider(
6874
environment=environment, release=release
@@ -73,6 +79,8 @@ def _initialize_instance(
7379
secret_key=secret_key,
7480
host=host,
7581
timeout=timeout,
82+
flush_at=flush_at,
83+
flush_interval=flush_interval,
7684
)
7785
tracer_provider.add_span_processor(langfuse_processor)
7886

0 commit comments

Comments
 (0)