Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
from langfuse.api.resources.commons.errors.error import Error
from langfuse.api.resources.commons.errors.not_found_error import NotFoundError
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
from langfuse.api.resources.prompts.types import (
CreatePromptRequest_Chat,
Expand Down Expand Up @@ -3597,6 +3598,14 @@ def fetch_prompts() -> Any:

return prompt

except NotFoundError as not_found_error:
langfuse_logger.warning(
f"Prompt '{cache_key}' not found during refresh, evicting from cache."
)
if self._resources is not None:
self._resources.prompt_cache.delete(cache_key)
raise not_found_error

except Exception as e:
langfuse_logger.error(
f"Error while fetching prompt '{cache_key}': {str(e)}"
Expand Down
3 changes: 3 additions & 0 deletions langfuse/_utils/prompt_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ def set(self, key: str, value: PromptClient, ttl_seconds: Optional[int]) -> None

self._cache[key] = PromptCacheItem(value, ttl_seconds)

def delete(self, key: str) -> None:
self._cache.pop(key, None)

def invalidate(self, prompt_name: str) -> None:
"""Invalidate all cached prompts with the given prompt name."""
for key in list(self._cache):
Expand Down
73 changes: 73 additions & 0 deletions tests/test_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from langfuse._client.client import Langfuse
from langfuse._utils.prompt_cache import (
DEFAULT_PROMPT_CACHE_TTL_SECONDS,
PromptCache,
PromptCacheItem,
)
from langfuse.api.resources.commons.errors.not_found_error import NotFoundError
from langfuse.api.resources.prompts import Prompt_Chat, Prompt_Text
from langfuse.model import ChatPromptClient, TextPromptClient
from tests.utils import create_uuid, get_api
Expand Down Expand Up @@ -679,9 +681,15 @@ def test_prompt_end_to_end():

@pytest.fixture
def langfuse():
from langfuse._client.resource_manager import LangfuseResourceManager

Comment thread
Betensis marked this conversation as resolved.
langfuse_instance = Langfuse()
langfuse_instance.api = Mock()

if langfuse_instance._resources is None:
langfuse_instance._resources = Mock(spec=LangfuseResourceManager)
langfuse_instance._resources.prompt_cache = PromptCache()

return langfuse_instance


Expand Down Expand Up @@ -1157,6 +1165,71 @@ def test_get_expired_prompt_when_failing_fetch(mock_time, langfuse: Langfuse):
assert result_call_2 == prompt_client


@patch.object(PromptCacheItem, "get_epoch_seconds")
def test_evict_prompt_cache_entry_when_refresh_returns_not_found(
mock_time, langfuse: Langfuse
) -> None:
mock_time.return_value = 0

prompt_name = "test_evict_prompt_cache_entry_when_refresh_returns_not_found"
ttl_seconds = 5
fallback_prompt = "fallback text prompt"

prompt = Prompt_Text(
name=prompt_name,
version=1,
prompt="Make me laugh",
labels=[],
type="text",
config={},
tags=[],
)
prompt_client = TextPromptClient(prompt)
cache_key = PromptCache.generate_cache_key(prompt_name, version=None, label=None)

mock_server_call = langfuse.api.prompts.get
mock_server_call.return_value = prompt

initial_result = langfuse.get_prompt(
prompt_name,
cache_ttl_seconds=ttl_seconds,
max_retries=0,
)
assert initial_result == prompt_client
assert langfuse._resources.prompt_cache.get(cache_key) is not None

# Expire cache entry and trigger background refresh
mock_time.return_value = ttl_seconds + 1

def raise_not_found(*_args: object, **_kwargs: object) -> None:
raise NotFoundError({"message": "Prompt not found"})

mock_server_call.side_effect = raise_not_found

stale_result = langfuse.get_prompt(
prompt_name,
cache_ttl_seconds=ttl_seconds,
max_retries=0,
)
assert stale_result == prompt_client

while True:
if langfuse._resources.prompt_cache._task_manager.active_tasks() == 0:
break
sleep(0.1)

assert langfuse._resources.prompt_cache.get(cache_key) is None

fallback_result = langfuse.get_prompt(
prompt_name,
cache_ttl_seconds=ttl_seconds,
fallback=fallback_prompt,
max_retries=0,
)
assert fallback_result.is_fallback
assert fallback_result.prompt == fallback_prompt


# Should fetch new prompt if version changes
def test_get_fresh_prompt_when_version_changes(langfuse: Langfuse):
prompt_name = "test_get_fresh_prompt_when_version_changes"
Expand Down
Loading