diff --git a/langfuse/__init__.py b/langfuse/__init__.py index d33febca7..c5f5b9d30 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -8,6 +8,7 @@ EvaluatorStats, MapperFunction, ) +from langfuse.ci import RegressionError, RunnerContext from langfuse.experiment import Evaluation from ._client import client as _client_module @@ -63,6 +64,8 @@ "EvaluatorStats", "BatchEvaluationResumeToken", "BatchEvaluationResult", + "RunnerContext", + "RegressionError", "__version__", "is_default_export_span", "is_langfuse_span", diff --git a/langfuse/ci.py b/langfuse/ci.py new file mode 100644 index 000000000..3d10af156 --- /dev/null +++ b/langfuse/ci.py @@ -0,0 +1,166 @@ +"""CI/CD helpers for running Langfuse experiments in GitHub Actions. + +Designed to be used in conjunction with the ``langfuse/experiment-action`` +GitHub Action (https://github.com/langfuse/experiment-action). The action +constructs a :class:`RunnerContext` pre-populated with dataset, run name, and +GitHub-sourced metadata, then calls the user's ``experiment(context)`` +function. +""" + +from datetime import datetime +from typing import TYPE_CHECKING, Dict, List, Optional + +from langfuse.batch_evaluation import CompositeEvaluatorFunction +from langfuse.experiment import ( + EvaluatorFunction, + ExperimentData, + ExperimentResult, + RunEvaluatorFunction, + TaskFunction, +) + +if TYPE_CHECKING: + from langfuse._client.client import Langfuse + + +class RunnerContext: + """Wraps :meth:`Langfuse.run_experiment` with CI-injected defaults. + + Intended for use with the ``langfuse/experiment-action`` GitHub Action + (https://github.com/langfuse/experiment-action). The action builds a + ``RunnerContext`` before invoking the user's ``experiment(context)`` + function. Defaults set here (dataset, name, run name, metadata tags) are + applied when the user omits them on the :meth:`run_experiment` call; + users can override any default by passing the corresponding argument + explicitly. + """ + + def __init__( + self, + *, + client: "Langfuse", + data: Optional[ExperimentData] = None, + dataset_version: Optional[datetime] = None, + name: Optional[str] = None, + run_name: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + ): + """Build a ``RunnerContext`` populated with defaults for ``run_experiment``. + + Typically called by the ``langfuse/experiment-action`` GitHub Action, + not by end users directly. Every field except ``client`` is optional: + fields left as ``None`` simply mean the corresponding argument must be + supplied on the :meth:`run_experiment` call. + + Args: + client: Initialized Langfuse SDK client used to execute the + experiment. The action creates this from the + ``langfuse_public_key`` / ``langfuse_secret_key`` / + ``langfuse_base_url`` inputs. + data: Default dataset items to run the experiment on. Accepts + either ``List[LocalExperimentItem]`` or ``List[DatasetItem]``. + Injected by the action when ``dataset_name`` is configured. + If ``None``, the user must pass ``data=`` to + :meth:`run_experiment`. + dataset_version: Optional pinned dataset version. Injected by the + action when ``dataset_version`` is configured. + name: Default human-readable experiment name (e.g. the action's + ``experiment_name`` input). If ``None``, the user must pass + ``name=`` to :meth:`run_experiment`. + run_name: Default exact run name. The action typically derives + this from the commit SHA / PR number so that reruns produce + distinct runs in Langfuse. + metadata: Default metadata attached to every experiment trace and + the dataset run. The action injects GitHub-sourced tags (SHA, + PR link, workflow run link, branch, GH user, etc.). Merged + with any ``metadata`` passed to :meth:`run_experiment`, with + user-supplied keys winning on collision. + """ + self.client = client + self.data = data + self.dataset_version = dataset_version + self.name = name + self.run_name = run_name + self.metadata = metadata + + def run_experiment( + self, + *, + name: Optional[str] = None, + run_name: Optional[str] = None, + description: Optional[str] = None, + data: Optional[ExperimentData] = None, + task: TaskFunction, + evaluators: List[EvaluatorFunction] = [], + composite_evaluator: Optional[CompositeEvaluatorFunction] = None, + run_evaluators: List[RunEvaluatorFunction] = [], + max_concurrency: int = 50, + metadata: Optional[Dict[str, str]] = None, + _dataset_version: Optional[datetime] = None, + ) -> ExperimentResult: + resolved_name = name if name is not None else self.name + if resolved_name is None: + raise ValueError( + "`name` must be provided either on the RunnerContext or the run_experiment call" + ) + + resolved_data = data if data is not None else self.data + if resolved_data is None: + raise ValueError( + "`data` must be provided either on the RunnerContext or the run_experiment call" + ) + + resolved_run_name = run_name if run_name is not None else self.run_name + resolved_dataset_version = ( + _dataset_version if _dataset_version is not None else self.dataset_version + ) + + merged_metadata: Optional[Dict[str, str]] + if self.metadata is None and metadata is None: + merged_metadata = None + else: + merged_metadata = {**(self.metadata or {}), **(metadata or {})} + + return self.client.run_experiment( + name=resolved_name, + run_name=resolved_run_name, + description=description, + data=resolved_data, + task=task, + evaluators=evaluators, + composite_evaluator=composite_evaluator, + run_evaluators=run_evaluators, + max_concurrency=max_concurrency, + metadata=merged_metadata, + _dataset_version=resolved_dataset_version, + ) + + +class RegressionError(Exception): + """Raised by a user's ``experiment`` function to signal a CI gate failure. + + The GitHub action catches this exception and, when ``should_fail_on_error`` + is enabled, fails the workflow run and renders a callout in the PR comment + using ``metric``/``value``/``threshold`` if supplied, otherwise ``str(exc)``. + """ + + def __init__( + self, + *, + result: ExperimentResult, + metric: Optional[str] = None, + value: Optional[float] = None, + threshold: Optional[float] = None, + message: Optional[str] = None, + ): + self.result = result + self.metric = metric + self.value = value + self.threshold = threshold + if message is not None: + formatted = message + elif metric is not None: + formatted = f"Regression on `{metric}`: {value} (threshold {threshold})" + else: + formatted = "Experiment regression detected" + super().__init__(formatted) diff --git a/tests/unit/test_ci.py b/tests/unit/test_ci.py new file mode 100644 index 000000000..b9b1e986c --- /dev/null +++ b/tests/unit/test_ci.py @@ -0,0 +1,231 @@ +"""Tests for ``langfuse.ci`` — ``RunnerContext`` and ``RegressionError``.""" + +import inspect +from datetime import datetime +from typing import Dict +from unittest.mock import MagicMock + +import pytest + +from langfuse import RegressionError, RunnerContext +from langfuse._client.client import Langfuse + + +def _noop_task(*, item, **kwargs): # pragma: no cover - never invoked via mock + return None + + +def _make_ctx(**kwargs) -> RunnerContext: + client = MagicMock(spec=Langfuse) + client.run_experiment.return_value = "result-sentinel" + return RunnerContext(client=client, **kwargs) + + +class TestRunnerContextDefaults: + def test_context_defaults_flow_through(self): + ctx_data = [{"input": "a"}] + ctx_version = datetime(2026, 1, 1) + ctx = _make_ctx( + data=ctx_data, + dataset_version=ctx_version, + name="ctx-name", + run_name="ctx-run", + metadata={"sha": "abc123"}, + ) + + result = ctx.run_experiment(task=_noop_task) + + assert result == "result-sentinel" + ctx.client.run_experiment.assert_called_once() + kwargs = ctx.client.run_experiment.call_args.kwargs + assert kwargs["name"] == "ctx-name" + assert kwargs["run_name"] == "ctx-run" + assert kwargs["data"] is ctx_data + assert kwargs["metadata"] == {"sha": "abc123"} + assert kwargs["_dataset_version"] == ctx_version + assert kwargs["task"] is _noop_task + + def test_call_overrides_win(self): + ctx = _make_ctx( + data=[{"input": "ctx"}], + dataset_version=datetime(2026, 1, 1), + name="ctx-name", + run_name="ctx-run", + ) + + override_data = [{"input": "override"}] + override_version = datetime(2026, 6, 6) + ctx.run_experiment( + task=_noop_task, + name="call-name", + run_name="call-run", + data=override_data, + _dataset_version=override_version, + ) + + kwargs = ctx.client.run_experiment.call_args.kwargs + assert kwargs["name"] == "call-name" + assert kwargs["run_name"] == "call-run" + assert kwargs["data"] is override_data + assert kwargs["_dataset_version"] == override_version + + +class TestRunnerContextMetadataMerge: + def test_user_keys_win_on_collision(self): + ctx = _make_ctx( + data=[{"input": "a"}], + name="n", + metadata={"sha": "abc", "branch": "main"}, + ) + ctx.run_experiment(task=_noop_task, metadata={"sha": "def", "pr": "42"}) + assert ctx.client.run_experiment.call_args.kwargs["metadata"] == { + "sha": "def", + "branch": "main", + "pr": "42", + } + + def test_context_metadata_only(self): + ctx = _make_ctx( + data=[{"input": "a"}], name="n", metadata={"sha": "abc"} + ) + ctx.run_experiment(task=_noop_task) + assert ctx.client.run_experiment.call_args.kwargs["metadata"] == {"sha": "abc"} + + def test_call_metadata_only(self): + ctx = _make_ctx(data=[{"input": "a"}], name="n") + ctx.run_experiment(task=_noop_task, metadata={"pr": "1"}) + assert ctx.client.run_experiment.call_args.kwargs["metadata"] == {"pr": "1"} + + def test_both_none_stays_none(self): + ctx = _make_ctx(data=[{"input": "a"}], name="n") + ctx.run_experiment(task=_noop_task) + assert ctx.client.run_experiment.call_args.kwargs["metadata"] is None + + +class TestRunnerContextLocalItems: + def test_local_items_pass_through_as_context_default(self): + items = [{"input": "x", "expected_output": "y"}] + ctx = _make_ctx(data=items, name="n") + ctx.run_experiment(task=_noop_task) + assert ctx.client.run_experiment.call_args.kwargs["data"] is items + + def test_local_items_pass_through_as_call_override(self): + ctx = _make_ctx(name="n") + items = [{"input": "x"}] + ctx.run_experiment(task=_noop_task, data=items) + assert ctx.client.run_experiment.call_args.kwargs["data"] is items + + +class TestRunnerContextValidation: + def test_missing_name_raises(self): + ctx = _make_ctx(data=[{"input": "a"}]) + with pytest.raises(ValueError, match="name"): + ctx.run_experiment(task=_noop_task) + + def test_missing_data_raises(self): + ctx = _make_ctx(name="n") + with pytest.raises(ValueError, match="data"): + ctx.run_experiment(task=_noop_task) + + +class TestRegressionError: + def test_is_exception(self): + result = MagicMock() + exc = RegressionError(result=result) + assert isinstance(exc, Exception) + assert exc.result is result + + def test_default_message(self): + exc = RegressionError(result=MagicMock()) + assert str(exc) == "Experiment regression detected" + assert exc.metric is None + assert exc.value is None + assert exc.threshold is None + + def test_structured_message(self): + exc = RegressionError( + result=MagicMock(), metric="avg_accuracy", value=0.78, threshold=0.9 + ) + assert exc.metric == "avg_accuracy" + assert exc.value == 0.78 + assert exc.threshold == 0.9 + assert "avg_accuracy" in str(exc) + assert "0.78" in str(exc) + assert "0.9" in str(exc) + + def test_user_message_wins(self): + exc = RegressionError( + result=MagicMock(), + metric="avg_accuracy", + value=0.5, + threshold=0.9, + message="custom explanation", + ) + assert str(exc) == "custom explanation" + + +class TestSignatureDriftGuard: + """Fails loudly if ``Langfuse.run_experiment`` grows a parameter that is + not threaded through ``RunnerContext.run_experiment``. + + The four action-relaxed params (``name``, ``run_name``, ``data``, + ``_dataset_version``) are allowed to diverge: the RunnerContext variant + must be the ``Optional[...]`` of the client annotation so the action can + inject them. + """ + + RELAXED_PARAMS = {"name", "run_name", "data", "_dataset_version"} + + def test_no_divergence(self): + client_params = self._params(Langfuse.run_experiment, skip_self=True) + ctx_params = self._params(RunnerContext.run_experiment, skip_self=True) + + assert set(client_params) == set(ctx_params), ( + "RunnerContext.run_experiment params do not match " + "Langfuse.run_experiment. Missing: " + f"{set(client_params) - set(ctx_params)}. " + f"Extra: {set(ctx_params) - set(client_params)}." + ) + + for name, client_param in client_params.items(): + ctx_param = ctx_params[name] + client_ann = client_param.annotation + ctx_ann = ctx_param.annotation + + if name in self.RELAXED_PARAMS: + # RunnerContext version must be Optional[] + # Already-optional client annotations (run_name, + # _dataset_version) just need to match as-is. + if self._is_optional(client_ann): + assert ctx_ann == client_ann, ( + f"param `{name}`: expected {client_ann}, got {ctx_ann}" + ) + else: + from typing import Optional + + assert ctx_ann == Optional[client_ann], ( + f"param `{name}`: expected Optional[{client_ann}], " + f"got {ctx_ann}" + ) + else: + assert ctx_ann == client_ann, ( + f"param `{name}`: annotation drift — " + f"client={client_ann}, context={ctx_ann}" + ) + + @staticmethod + def _params(func, *, skip_self: bool) -> Dict[str, inspect.Parameter]: + sig = inspect.signature(func) + return { + name: p + for name, p in sig.parameters.items() + if not (skip_self and name == "self") + } + + @staticmethod + def _is_optional(annotation) -> bool: + import typing + + origin = typing.get_origin(annotation) + args = typing.get_args(annotation) + return origin is typing.Union and type(None) in args