fix(experiments): move to sync run item creation to avoid latency issues#1444
Closed
fix(experiments): move to sync run item creation to avoid latency issues#1444
Conversation
Contributor
There was a problem hiding this comment.
1 file reviewed, 1 comment
Edit Code Review Agent Settings | Greptile
React with 👍 or 👎 to share your feedback on this new summary format
Comment on lines
+2818
to
2827
| dataset_run_item = self.api.dataset_run_items.create( | ||
| CreateDatasetRunItemRequest( | ||
| runName=experiment_run_name, | ||
| runDescription=experiment_description, | ||
| metadata=experiment_metadata, | ||
| datasetItemId=item.id, # type: ignore | ||
| traceId=trace_id, | ||
| observationId=span.id, | ||
| ), | ||
| ) | ||
| ) |
Contributor
There was a problem hiding this comment.
logic: calling sync API self.api.dataset_run_items.create() directly inside an async function blocks the event loop. When running 100+ concurrent items via asyncio.gather() (line 2690), this blocking call defeats the purpose of async concurrency. Each blocked call prevents other coroutines from executing, effectively serializing execution and degrading performance worse than the original asyncio.to_thread() approach.
Use asyncio.to_thread() to offload the blocking call:
Suggested change
| dataset_run_item = self.api.dataset_run_items.create( | |
| CreateDatasetRunItemRequest( | |
| runName=experiment_run_name, | |
| runDescription=experiment_description, | |
| metadata=experiment_metadata, | |
| datasetItemId=item.id, # type: ignore | |
| traceId=trace_id, | |
| observationId=span.id, | |
| ), | |
| ) | |
| ) | |
| dataset_run_item = await asyncio.to_thread( | |
| self.api.dataset_run_items.create, | |
| CreateDatasetRunItemRequest( | |
| runName=experiment_run_name, | |
| runDescription=experiment_description, | |
| metadata=experiment_metadata, | |
| datasetItemId=item.id, # type: ignore | |
| traceId=trace_id, | |
| observationId=span.id, | |
| ) | |
| ) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/client.py
Line: 2818:2827
Comment:
**logic:** calling sync API `self.api.dataset_run_items.create()` directly inside an async function blocks the event loop. When running 100+ concurrent items via `asyncio.gather()` (line 2690), this blocking call defeats the purpose of async concurrency. Each blocked call prevents other coroutines from executing, effectively serializing execution and degrading performance worse than the original `asyncio.to_thread()` approach.
Use `asyncio.to_thread()` to offload the blocking call:
```suggestion
dataset_run_item = await asyncio.to_thread(
self.api.dataset_run_items.create,
CreateDatasetRunItemRequest(
runName=experiment_run_name,
runDescription=experiment_description,
metadata=experiment_metadata,
datasetItemId=item.id, # type: ignore
traceId=trace_id,
observationId=span.id,
)
)
```
How can I resolve this? If you propose a fix, please make it concise.
auto-merge was automatically disabled
November 25, 2025 10:20
Pull request was closed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Important
Switches from async to sync API for dataset run item creation in
client.pyto address performance issues at high concurrency.dataset_run_items.createin_process_experiment_item()inclient.py.httpx.AsyncClientat high concurrency (100+ requests).This description was created by
for 3d570eb. You can customize this summary. It will automatically update as commits are pushed.
Disclaimer: Experimental PR review
Greptile Summary
asyncio.to_thread) with direct blocking sync call in_process_experiment_itemConfidence Score: 1/5
asyncio.to_thread()with a direct blocking sync call inside an async function that runs 100+ items concurrently viaasyncio.gather(), which blocks the event loop and defeats async concurrencyImportant Files Changed
Sequence Diagram
sequenceDiagram participant User participant Langfuse participant ProcessItem as "_process_experiment_item" participant API as "dataset_run_items.create" participant Server as "Langfuse Server" User->>Langfuse: "run_experiment(data, task)" Langfuse->>Langfuse: "asyncio.gather() 100+ items" loop For each item concurrently Langfuse->>ProcessItem: "await _process_experiment_item(item)" ProcessItem->>ProcessItem: "Check if item has dataset_id" ProcessItem->>API: "self.api.dataset_run_items.create(request)" Note over API: "Blocking sync call (blocks event loop)" API->>Server: "HTTP POST /dataset_run_items" Server-->>API: "Response (~300ms)" API-->>ProcessItem: "dataset_run_item" ProcessItem->>ProcessItem: "Execute task(item)" ProcessItem-->>Langfuse: "ExperimentItemResult" end Langfuse-->>User: "ExperimentResult"