|
1 | 1 | import datetime as dt |
2 | 2 | import logging |
3 | | -from .span import LangfuseSpan |
4 | | -from typing import TYPE_CHECKING, Any, Generator, List, Optional |
| 3 | +from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional |
5 | 4 |
|
6 | 5 | from opentelemetry.util._decorator import _agnosticcontextmanager |
7 | 6 |
|
| 7 | +from langfuse.experiment import ( |
| 8 | + EvaluatorFunction, |
| 9 | + ExperimentResult, |
| 10 | + RunEvaluatorFunction, |
| 11 | + TaskFunction, |
| 12 | +) |
8 | 13 | from langfuse.model import ( |
9 | 14 | CreateDatasetRunItemRequest, |
10 | 15 | Dataset, |
11 | 16 | DatasetItem, |
12 | 17 | DatasetStatus, |
13 | 18 | ) |
14 | 19 |
|
| 20 | +from .span import LangfuseSpan |
| 21 | + |
15 | 22 | if TYPE_CHECKING: |
16 | 23 | from langfuse._client.client import Langfuse |
17 | 24 |
|
@@ -181,3 +188,230 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]): |
181 | 188 | self.created_at = dataset.created_at |
182 | 189 | self.updated_at = dataset.updated_at |
183 | 190 | self.items = items |
| 191 | + self._langfuse: Optional["Langfuse"] = None |
| 192 | + |
| 193 | + def _get_langfuse_client(self) -> Optional["Langfuse"]: |
| 194 | + """Get the Langfuse client from the first item.""" |
| 195 | + if self._langfuse is None and self.items: |
| 196 | + self._langfuse = self.items[0].langfuse |
| 197 | + return self._langfuse |
| 198 | + |
| 199 | + def run_experiment( |
| 200 | + self, |
| 201 | + *, |
| 202 | + name: str, |
| 203 | + run_name: Optional[str] = None, |
| 204 | + description: Optional[str] = None, |
| 205 | + task: TaskFunction, |
| 206 | + evaluators: List[EvaluatorFunction] = [], |
| 207 | + run_evaluators: List[RunEvaluatorFunction] = [], |
| 208 | + max_concurrency: int = 50, |
| 209 | + metadata: Optional[Dict[str, Any]] = None, |
| 210 | + ) -> ExperimentResult: |
| 211 | + """Run an experiment on this Langfuse dataset with automatic tracking. |
| 212 | +
|
| 213 | + This is a convenience method that runs an experiment using all items in this |
| 214 | + dataset. It automatically creates a dataset run in Langfuse for tracking and |
| 215 | + comparison purposes, linking all experiment results to the dataset. |
| 216 | +
|
| 217 | + Key benefits of using dataset.run_experiment(): |
| 218 | + - Automatic dataset run creation and linking in Langfuse UI |
| 219 | + - Built-in experiment tracking and versioning |
| 220 | + - Easy comparison between different experiment runs |
| 221 | + - Direct access to dataset items with their metadata and expected outputs |
| 222 | + - Automatic URL generation for viewing results in Langfuse dashboard |
| 223 | +
|
| 224 | + Args: |
| 225 | + name: Human-readable name for the experiment run. This will be used as |
| 226 | + the dataset run name in Langfuse for tracking and identification. |
| 227 | + run_name: Optional exact name for the dataset run. If provided, this will be |
| 228 | + used as the exact dataset run name in Langfuse. If not provided, this will |
| 229 | + default to the experiment name appended with an ISO timestamp. |
| 230 | + description: Optional description of the experiment's purpose, methodology, |
| 231 | + or what you're testing. Appears in the Langfuse UI for context. |
| 232 | + task: Function that processes each dataset item and returns output. |
| 233 | + The function will receive DatasetItem objects with .input, .expected_output, |
| 234 | + .metadata attributes. Signature should be: task(*, item, **kwargs) -> Any |
| 235 | + evaluators: List of functions to evaluate each item's output individually. |
| 236 | + These will have access to the item's expected_output for comparison. |
| 237 | + run_evaluators: List of functions to evaluate the entire experiment run. |
| 238 | + Useful for computing aggregate statistics across all dataset items. |
| 239 | + max_concurrency: Maximum number of concurrent task executions (default: 50). |
| 240 | + Adjust based on API rate limits and system resources. |
| 241 | + metadata: Optional metadata to attach to the experiment run and all traces. |
| 242 | + Will be combined with individual item metadata. |
| 243 | +
|
| 244 | + Returns: |
| 245 | + ExperimentResult object containing: |
| 246 | + - name: The experiment name. |
| 247 | + - run_name: The experiment run name (equivalent to the dataset run name). |
| 248 | + - description: Optional experiment description. |
| 249 | + - item_results: Results for each dataset item with outputs and evaluations. |
| 250 | + - run_evaluations: Aggregate evaluation results for the entire run. |
| 251 | + - dataset_run_id: ID of the created dataset run in Langfuse. |
| 252 | + - dataset_run_url: Direct URL to view the experiment results in Langfuse UI. |
| 253 | +
|
| 254 | + The result object provides a format() method for human-readable output: |
| 255 | + ```python |
| 256 | + result = dataset.run_experiment(...) |
| 257 | + print(result.format()) # Summary view |
| 258 | + print(result.format(include_item_results=True)) # Detailed view |
| 259 | + ``` |
| 260 | +
|
| 261 | + Raises: |
| 262 | + ValueError: If the dataset has no items or no Langfuse client is available. |
| 263 | +
|
| 264 | + Examples: |
| 265 | + Basic dataset experiment: |
| 266 | + ```python |
| 267 | + dataset = langfuse.get_dataset("qa-evaluation-set") |
| 268 | +
|
| 269 | + def answer_questions(*, item, **kwargs): |
| 270 | + # item is a DatasetItem with .input, .expected_output, .metadata |
| 271 | + question = item.input |
| 272 | + return my_qa_system.answer(question) |
| 273 | +
|
| 274 | + def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): |
| 275 | + if not expected_output: |
| 276 | + return {"name": "accuracy", "value": None, "comment": "No expected output"} |
| 277 | +
|
| 278 | + is_correct = output.strip().lower() == expected_output.strip().lower() |
| 279 | + return { |
| 280 | + "name": "accuracy", |
| 281 | + "value": 1.0 if is_correct else 0.0, |
| 282 | + "comment": "Correct" if is_correct else "Incorrect" |
| 283 | + } |
| 284 | +
|
| 285 | + result = dataset.run_experiment( |
| 286 | + name="QA System v2.0 Evaluation", |
| 287 | + description="Testing improved QA system on curated question set", |
| 288 | + task=answer_questions, |
| 289 | + evaluators=[accuracy_evaluator] |
| 290 | + ) |
| 291 | +
|
| 292 | + print(f"Evaluated {len(result['item_results'])} questions") |
| 293 | + print(f"View detailed results: {result['dataset_run_url']}") |
| 294 | + ``` |
| 295 | +
|
| 296 | + Advanced experiment with multiple evaluators and run-level analysis: |
| 297 | + ```python |
| 298 | + dataset = langfuse.get_dataset("content-generation-benchmark") |
| 299 | +
|
| 300 | + async def generate_content(*, item, **kwargs): |
| 301 | + prompt = item.input |
| 302 | + response = await openai_client.chat.completions.create( |
| 303 | + model="gpt-4", |
| 304 | + messages=[{"role": "user", "content": prompt}], |
| 305 | + temperature=0.7 |
| 306 | + ) |
| 307 | + return response.choices[0].message.content |
| 308 | +
|
| 309 | + def quality_evaluator(*, input, output, expected_output=None, metadata=None, **kwargs): |
| 310 | + # Use metadata for context-aware evaluation |
| 311 | + content_type = metadata.get("type", "general") if metadata else "general" |
| 312 | +
|
| 313 | + # Basic quality checks |
| 314 | + word_count = len(output.split()) |
| 315 | + min_words = {"blog": 300, "tweet": 10, "summary": 100}.get(content_type, 50) |
| 316 | +
|
| 317 | + return [ |
| 318 | + { |
| 319 | + "name": "word_count", |
| 320 | + "value": word_count, |
| 321 | + "comment": f"Generated {word_count} words" |
| 322 | + }, |
| 323 | + { |
| 324 | + "name": "meets_length_requirement", |
| 325 | + "value": word_count >= min_words, |
| 326 | + "comment": f"{'Meets' if word_count >= min_words else 'Below'} minimum {min_words} words for {content_type}" |
| 327 | + } |
| 328 | + ] |
| 329 | +
|
| 330 | + def content_diversity(*, item_results, **kwargs): |
| 331 | + # Analyze diversity across all generated content |
| 332 | + all_outputs = [result["output"] for result in item_results] |
| 333 | + unique_words = set() |
| 334 | + total_words = 0 |
| 335 | +
|
| 336 | + for output in all_outputs: |
| 337 | + words = output.lower().split() |
| 338 | + unique_words.update(words) |
| 339 | + total_words += len(words) |
| 340 | +
|
| 341 | + diversity_ratio = len(unique_words) / total_words if total_words > 0 else 0 |
| 342 | +
|
| 343 | + return { |
| 344 | + "name": "vocabulary_diversity", |
| 345 | + "value": diversity_ratio, |
| 346 | + "comment": f"Used {len(unique_words)} unique words out of {total_words} total ({diversity_ratio:.2%} diversity)" |
| 347 | + } |
| 348 | +
|
| 349 | + result = dataset.run_experiment( |
| 350 | + name="Content Generation Diversity Test", |
| 351 | + description="Evaluating content quality and vocabulary diversity across different content types", |
| 352 | + task=generate_content, |
| 353 | + evaluators=[quality_evaluator], |
| 354 | + run_evaluators=[content_diversity], |
| 355 | + max_concurrency=3, # Limit API calls |
| 356 | + metadata={"model": "gpt-4", "temperature": 0.7} |
| 357 | + ) |
| 358 | +
|
| 359 | + # Results are automatically linked to dataset in Langfuse |
| 360 | + print(f"Experiment completed! View in Langfuse: {result['dataset_run_url']}") |
| 361 | +
|
| 362 | + # Access individual results |
| 363 | + for i, item_result in enumerate(result["item_results"]): |
| 364 | + print(f"Item {i+1}: {item_result['evaluations']}") |
| 365 | + ``` |
| 366 | +
|
| 367 | + Comparing different model versions: |
| 368 | + ```python |
| 369 | + # Run multiple experiments on the same dataset for comparison |
| 370 | + dataset = langfuse.get_dataset("model-benchmark") |
| 371 | +
|
| 372 | + # Experiment 1: GPT-4 |
| 373 | + result_gpt4 = dataset.run_experiment( |
| 374 | + name="GPT-4 Baseline", |
| 375 | + description="Baseline performance with GPT-4", |
| 376 | + task=lambda *, item, **kwargs: gpt4_model.generate(item.input), |
| 377 | + evaluators=[accuracy_evaluator, fluency_evaluator] |
| 378 | + ) |
| 379 | +
|
| 380 | + # Experiment 2: Custom model |
| 381 | + result_custom = dataset.run_experiment( |
| 382 | + name="Custom Model v1.2", |
| 383 | + description="Testing our fine-tuned model", |
| 384 | + task=lambda *, item, **kwargs: custom_model.generate(item.input), |
| 385 | + evaluators=[accuracy_evaluator, fluency_evaluator] |
| 386 | + ) |
| 387 | +
|
| 388 | + # Both experiments are now visible in Langfuse for easy comparison |
| 389 | + print("Compare results in Langfuse:") |
| 390 | + print(f"GPT-4: {result_gpt4.dataset_run_url}") |
| 391 | + print(f"Custom: {result_custom.dataset_run_url}") |
| 392 | + ``` |
| 393 | +
|
| 394 | + Note: |
| 395 | + - All experiment results are automatically tracked in Langfuse as dataset runs |
| 396 | + - Dataset items provide .input, .expected_output, and .metadata attributes |
| 397 | + - Results can be easily compared across different experiment runs in the UI |
| 398 | + - The dataset_run_url provides direct access to detailed results and analysis |
| 399 | + - Failed items are handled gracefully and logged without stopping the experiment |
| 400 | + - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) |
| 401 | + - Async execution is handled automatically with smart event loop detection |
| 402 | + """ |
| 403 | + langfuse_client = self._get_langfuse_client() |
| 404 | + if not langfuse_client: |
| 405 | + raise ValueError("No Langfuse client available. Dataset items are empty.") |
| 406 | + |
| 407 | + return langfuse_client.run_experiment( |
| 408 | + name=name, |
| 409 | + run_name=run_name, |
| 410 | + description=description, |
| 411 | + data=self.items, |
| 412 | + task=task, |
| 413 | + evaluators=evaluators, |
| 414 | + run_evaluators=run_evaluators, |
| 415 | + max_concurrency=max_concurrency, |
| 416 | + metadata=metadata, |
| 417 | + ) |
0 commit comments