Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __init__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
):
) -> None:
self._host = host or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
self._environment = environment or os.environ.get(LANGFUSE_TRACING_ENVIRONMENT)
self._mask = mask
Expand Down
2 changes: 1 addition & 1 deletion langfuse/_client/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def run(
run_name: str,
run_metadata: Optional[Any] = None,
run_description: Optional[str] = None,
):
) -> Any:
"""Create a context manager for the dataset item run that links the execution to a Langfuse trace.

This method is a context manager that creates a trace for the dataset run and yields a span
Expand Down
2 changes: 1 addition & 1 deletion langfuse/_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from opentelemetry.sdk.trace import ReadableSpan


def span_formatter(span: ReadableSpan):
def span_formatter(span: ReadableSpan) -> str:
parent_id = (
otel_trace_api.format_span_id(span.parent.span_id) if span.parent else None
)
Expand Down
4 changes: 2 additions & 2 deletions langfuse/_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
log = logging.getLogger("langfuse")


def _get_timestamp():
def _get_timestamp() -> datetime:
return datetime.now(timezone.utc)


def _create_prompt_context(
prompt: typing.Optional[PromptClient] = None,
):
) -> typing.Dict[str, typing.Optional[str]]:
if prompt is not None and not prompt.is_fallback:
return {"prompt_version": prompt.version, "prompt_name": prompt.name}

Expand Down
3 changes: 2 additions & 1 deletion langfuse/_utils/environment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""@private"""

import os
from typing import Optional

common_release_envs = [
# Render
Expand All @@ -26,7 +27,7 @@
]


def get_common_release_envs():
def get_common_release_envs() -> Optional[str]:
for env in common_release_envs:
if env in os.environ:
return os.environ[env]
Expand Down
15 changes: 9 additions & 6 deletions langfuse/_utils/error_logging.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import functools
import logging
from typing import List, Optional
from typing import Any, Callable, List, Optional

logger = logging.getLogger("langfuse")


def catch_and_log_errors(func):
def catch_and_log_errors(func: Callable[..., Any]) -> Callable[..., Any]:
"""Catch all exceptions and log them. Do NOT re-raise the exception."""

@functools.wraps(func)
def wrapper(*args, **kwargs):
def wrapper(*args: Any, **kwargs: Any) -> Any:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Missing return value in wrapper function when exception occurs. Should return None explicitly.

try:
return func(*args, **kwargs)
except Exception as e:
Expand All @@ -18,14 +18,17 @@ def wrapper(*args, **kwargs):
return wrapper


def auto_decorate_methods_with(decorator, exclude: Optional[List[str]] = []):
def auto_decorate_methods_with(
decorator: Callable[[Any], Any], exclude: Optional[List[str]] = None
) -> Callable[[Any], Any]:
"""Class decorator to automatically apply a given decorator to all
methods of a class.
"""

def class_decorator(cls):
def class_decorator(cls: Any) -> Any:
exclude_list = exclude or []
for attr_name, attr_value in cls.__dict__.items():
if attr_name in exclude:
if attr_name in exclude_list:
continue
if callable(attr_value):
# Wrap callable attributes (methods) with the decorator
Expand Down
20 changes: 10 additions & 10 deletions langfuse/_utils/prompt_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
from queue import Empty, Queue
from threading import Thread
from typing import Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set

from langfuse.model import PromptClient

Expand Down Expand Up @@ -39,7 +39,7 @@ def __init__(self, queue: Queue, identifier: int):
self._queue = queue
self._identifier = identifier

def run(self):
def run(self) -> None:
while self.running:
try:
task = self._queue.get(timeout=1)
Expand All @@ -58,7 +58,7 @@ def run(self):
except Empty:
pass

def pause(self):
def pause(self) -> None:
"""Pause the consumer."""
self.running = False

Expand All @@ -83,7 +83,7 @@ def __init__(self, threads: int = 1):

atexit.register(self.shutdown)

def add_task(self, key: str, task):
def add_task(self, key: str, task: Any) -> None:
if key not in self._processing_keys:
self._log.debug(f"Adding prompt cache refresh task for key: {key}")
self._processing_keys.add(key)
Expand All @@ -97,8 +97,8 @@ def add_task(self, key: str, task):
def active_tasks(self) -> int:
return len(self._processing_keys)

def _wrap_task(self, key: str, task):
def wrapped():
def _wrap_task(self, key: str, task: Any) -> Any:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The return type Any is too permissive and doesn't match the actual Callable[[], None] return type of this function

def wrapped() -> None:
self._log.debug(f"Refreshing prompt cache for key: {key}")
try:
task()
Expand All @@ -108,7 +108,7 @@ def wrapped():

return wrapped

def shutdown(self):
def shutdown(self) -> None:
self._log.debug(
f"Shutting down prompt refresh task manager, {len(self._consumers)} consumers,..."
)
Expand Down Expand Up @@ -146,19 +146,19 @@ def __init__(
def get(self, key: str) -> Optional[PromptCacheItem]:
return self._cache.get(key, None)

def set(self, key: str, value: PromptClient, ttl_seconds: Optional[int]):
def set(self, key: str, value: PromptClient, ttl_seconds: Optional[int]) -> None:
if ttl_seconds is None:
ttl_seconds = DEFAULT_PROMPT_CACHE_TTL_SECONDS

self._cache[key] = PromptCacheItem(value, ttl_seconds)

def invalidate(self, prompt_name: str):
def invalidate(self, prompt_name: str) -> None:
"""Invalidate all cached prompts with the given prompt name."""
for key in list(self._cache):
if key.startswith(prompt_name):
del self._cache[key]

def add_refresh_prompt_task(self, key: str, fetch_func):
def add_refresh_prompt_task(self, key: str, fetch_func: Any) -> None:
self._log.debug(f"Submitting refresh task for key: {key}")
self._task_manager.add_task(key, fetch_func)

Expand Down
12 changes: 7 additions & 5 deletions langfuse/_utils/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,26 @@
try:
from langchain.load.serializable import Serializable
except ImportError:
# If Serializable is not available, set it to NoneType
Serializable = type(None)
# If Serializable is not available, set it to a placeholder type
class Serializable:
pass


# Attempt to import numpy
try:
import numpy as np
except ImportError:
np = None
np = None # type: ignore

logger = getLogger(__name__)


class EventSerializer(JSONEncoder):
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.seen = set() # Track seen objects to detect circular references

def default(self, obj: Any):
def default(self, obj: Any) -> Any:
try:
if isinstance(obj, (datetime)):
# Timezone-awareness check
Expand Down
26 changes: 16 additions & 10 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ def on_chain_start(
def _register_langfuse_prompt(
self,
*,
run_id,
run_id: UUID,
parent_run_id: Optional[UUID],
metadata: Optional[Dict[str, Any]],
):
) -> None:
"""We need to register any passed Langfuse prompt to the parent_run_id so that we can link following generations with that prompt.

If parent_run_id is None, we are at the root of a trace and should not attempt to register the prompt, as there will be no LLM invocation following it.
Expand All @@ -209,7 +209,7 @@ def _register_langfuse_prompt(
registered_prompt = self.prompt_to_parent_run_map[parent_run_id]
self.prompt_to_parent_run_map[run_id] = registered_prompt

def _deregister_langfuse_prompt(self, run_id: Optional[UUID]):
def _deregister_langfuse_prompt(self, run_id: Optional[UUID]) -> None:
if run_id in self.prompt_to_parent_run_map:
del self.prompt_to_parent_run_map[run_id]

Expand Down Expand Up @@ -751,7 +751,7 @@ def _log_debug_event(
)


def _extract_raw_response(last_response):
def _extract_raw_response(last_response: Any) -> Any:
"""Extract the response from the last response of the LLM call."""
# We return the text of the response if not empty
if last_response.text is not None and last_response.text.strip() != "":
Expand All @@ -764,11 +764,13 @@ def _extract_raw_response(last_response):
return ""


def _flatten_comprehension(matrix):
def _flatten_comprehension(matrix: Any) -> List[Any]:
return [item for row in matrix for item in row]


def _parse_usage_model(usage: typing.Union[pydantic.BaseModel, dict]):
def _parse_usage_model(
usage: typing.Union[pydantic.BaseModel, dict],
) -> typing.Optional[typing.Dict[str, typing.Any]]:
# maintains a list of key translations. For each key, the usage model is checked
# and a new object will be created with the new key if the key exists in the usage model
# All non matched keys will remain on the object.
Expand Down Expand Up @@ -891,7 +893,7 @@ def _parse_usage_model(usage: typing.Union[pydantic.BaseModel, dict]):
return usage_model if usage_model else None


def _parse_usage(response: LLMResult):
def _parse_usage(response: LLMResult) -> typing.Optional[typing.Dict[str, typing.Any]]:
# langchain-anthropic uses the usage field
llm_usage_keys = ["token_usage", "usage"]
llm_usage = None
Expand Down Expand Up @@ -938,7 +940,7 @@ def _parse_usage(response: LLMResult):
return llm_usage


def _parse_model(response: LLMResult):
def _parse_model(response: LLMResult) -> typing.Optional[str]:
# langchain-anthropic uses the usage field
llm_model_keys = ["model_name"]
llm_model = None
Expand All @@ -951,14 +953,18 @@ def _parse_model(response: LLMResult):
return llm_model


def _parse_model_name_from_metadata(metadata: Optional[Dict[str, Any]]):
def _parse_model_name_from_metadata(
metadata: Optional[Dict[str, Any]],
) -> typing.Optional[str]:
if metadata is None or not isinstance(metadata, dict):
return None

return metadata.get("ls_model_name", None)


def _strip_langfuse_keys_from_dict(metadata: Optional[Dict[str, Any]]):
def _strip_langfuse_keys_from_dict(
metadata: Optional[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
if metadata is None or not isinstance(metadata, dict):
return metadata

Expand Down
Loading