Skip to content
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def load_routing_configuration(file: Path = _config_file) -> TomlTable:
return typing.cast("TomlTable", _load_configuration(file)["routing"])


def load_run_configuration(file: Path = _config_file) -> TomlTable:
return typing.cast("TomlTable", _load_configuration(file).get("run", {}))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When resolving merge conflicts, please make this a DevelopmentConfiguration option for now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.



@functools.cache
def load_database_configuration(file: Path = _config_file) -> TomlTable:
configuration = _load_configuration(file)
Expand Down
3 changes: 3 additions & 0 deletions src/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ database="openml"
[routing]
minio_url="http://minio:9000/"
server_url="http://php-api:80/"

[run]
evaluation_engine_ids = [1]
2 changes: 1 addition & 1 deletion src/database/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def get(id_: int, expdb: AsyncConnection) -> Row | None:
row = await expdb.execute(
text(
"""
SELECT *, uploadDate as upload_date
SELECT *, uploadDate as upload_date, fullName AS full_name
FROM implementation
WHERE id = :flow_id
""",
Expand Down
179 changes: 179 additions & 0 deletions src/database/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,185 @@ async def exist(id_: int, expdb: AsyncConnection) -> bool:
return bool(row.one_or_none())


async def get(run_id: int, expdb: AsyncConnection) -> Row | None:
"""Fetch the core run row from the `run` table.

Returns the row if found, or None if no run with `run_id` exists.
The `error_message` column is NULL when the run completed without errors.
"""
row = await expdb.execute(
text(
"""
SELECT `rid`, `uploader`, `setup`, `task_id`, `error_message`
FROM `run`
WHERE `rid` = :run_id
""",
),
parameters={"run_id": run_id},
)
return row.one_or_none()


async def get_uploader_name(uploader_id: int, userdb: AsyncConnection) -> str | None:
"""Fetch the display name of a user from the openml database.

Queries the `users` table in the separate openml DB and concatenates
first_name + ' ' + last_name. Returns None if the user does not exist.
"""
row = await userdb.execute(
text(
"""
SELECT CONCAT(`first_name`, ' ', `last_name`) AS `name`
FROM `users`
WHERE `id` = :uploader_id
""",
),
parameters={"uploader_id": uploader_id},
)
result = row.one_or_none()
return result.name if result else None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could rather extend the database.users.User class to keep around more information than just the identifier, instead of creating separate method to fetch different attributes.

  • extend the User class with first_name and last_name attribute, and a full_name property.
  • refactor get_user_id_for to return a user:
    • rename to get_user
    • allow fetching by either apikey or id
    • return an instantiated User.

then you can use that method instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really a good thought, will extend User class and use it.



async def get_tags(run_id: int, expdb: AsyncConnection) -> list[str]:
"""Fetch all tags associated with a run from the `run_tag` table.

The `id` column in `run_tag` refers to the run ID
"""
rows = await expdb.execute(
text(
"""
SELECT `tag`
FROM `run_tag`
WHERE `id` = :run_id
""",
),
parameters={"run_id": run_id},
)
return [row.tag for row in rows.all()]


async def get_input_data(run_id: int, expdb: AsyncConnection) -> list[Row]:
"""Fetch the dataset(s) used as input for a run, with name and url.

Joins `input_data` with `dataset` to include the dataset name and ARFF URL.
"""
rows = await expdb.execute(
text(
"""
SELECT `id`.`data` AS `did`, `d`.`name`, `d`.`url`
FROM `input_data` `id`
JOIN `dataset` `d` ON `id`.`data` = `d`.`did`
WHERE `id`.`run` = :run_id
""",
),
parameters={"run_id": run_id},
)
return cast("list[Row]", rows.all())


async def get_output_files(run_id: int, expdb: AsyncConnection) -> list[Row]:
"""Fetch output files attached to a run from the `runfile` table.

Typical entries include the description XML and predictions ARFF.
The `field` column holds the file label (e.g. "description", "predictions").

Note: the PHP response includes a deprecated `did` field hardcoded to "-1"
for each file. This implementation omits it entirely.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a detail of the API layer, not the database layer. Can move the comment there.

"""
rows = await expdb.execute(
text(
"""
SELECT `file_id`, `field`
FROM `runfile`
WHERE `source` = :run_id
""",
),
parameters={"run_id": run_id},
)
return cast("list[Row]", rows.all())


async def get_evaluations(
run_id: int,
expdb: AsyncConnection,
*,
evaluation_engine_ids: list[int],
) -> list[Row]:
"""Fetch evaluation metric results for a run.

Joins `evaluation` with `math_function` to resolve the metric name
(the `evaluation` table stores only a `function_id`, not the name directly).

Filters by `evaluation_engine_id IN (...)`. The list is configurable
via `config.toml [run] evaluation_engine_ids`.
Dynamic named parameters are used for aiomysql compatibility.
"""
if not evaluation_engine_ids:
return []

# Build :eid_0, :eid_1, ... placeholders — one per engine ID.
eid_params = {f"eid_{i}": eid for i, eid in enumerate(evaluation_engine_ids)}
placeholders = ", ".join(f":eid_{i}" for i in range(len(evaluation_engine_ids)))

query = text(
f"""
SELECT `m`.`name`, `e`.`value`, `e`.`array_data`
FROM `evaluation` `e`
JOIN `math_function` `m` ON `e`.`function_id` = `m`.`id`
WHERE `e`.`source` = :run_id
AND `e`.`evaluation_engine_id` IN ({placeholders})
""", # noqa: S608 # placeholders are trusted integer params, not user input
)
rows = await expdb.execute(
query,
parameters={"run_id": run_id, **eid_params},
)
return cast("list[Row]", rows.all())


async def get_task_type(task_id: int, expdb: AsyncConnection) -> str | None:
"""Fetch the human-readable task type name for the task associated with a run.

Joins `task` and `task_type` on `ttid` to resolve the name
(e.g. "Supervised Classification").
"""
row = await expdb.execute(
text(
"""
SELECT `tt`.`name`
FROM `task` `t`
JOIN `task_type` `tt` ON `t`.`ttid` = `tt`.`ttid`
WHERE `t`.`task_id` = :task_id
""",
),
parameters={"task_id": task_id},
)
result = row.one_or_none()
return result.name if result else None


async def get_task_evaluation_measure(task_id: int, expdb: AsyncConnection) -> str | None:
"""Fetch the evaluation measure configured for a task, if any.

Queries `task_inputs` for the row where `input = 'evaluation_measures'`.
Returns None (not an empty string) when no such row exists, so callers
can treat a falsy result uniformly.
"""
row = await expdb.execute(
text(
"""
SELECT `value`
FROM `task_inputs`
WHERE `task_id` = :task_id
AND `input` = 'evaluation_measures'
""",
),
parameters={"task_id": task_id},
)
result = row.one_or_none()
return result.value if result else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move these to the tasks module. I will have a look at consolidating these if appropriate.



async def get_trace(run_id: int, expdb: AsyncConnection) -> Sequence[Row]:
"""Get trace rows for a run from the trace table."""
rows = await expdb.execute(
Expand Down
140 changes: 137 additions & 3 deletions src/routers/openml/runs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
"""Endpoints for run-related data."""

from typing import Annotated
import asyncio
from typing import TYPE_CHECKING, Annotated, cast

from fastapi import APIRouter, Depends

if TYPE_CHECKING:
from sqlalchemy import Row
from sqlalchemy.ext.asyncio import AsyncConnection

import config
import database.flows
import database.runs
import database.setups
from core.errors import RunNotFoundError, RunTraceNotFoundError
from routers.dependencies import expdb_connection
from schemas.runs import RunTrace, TraceIteration
from routers.dependencies import expdb_connection, userdb_connection
from schemas.runs import (
EvaluationScore,
InputDataset,
OutputFile,
ParameterSetting,
Run,
RunTrace,
TraceIteration,
)

router = APIRouter(prefix="/run", tags=["run"])

Expand Down Expand Up @@ -42,3 +57,122 @@ async def get_run_trace(
for row in trace_rows
],
)


@router.post(
path="/{run_id}",
description="Provided for convenience, same as `GET` endpoint.",
response_model_exclude_none=True,
)
@router.get("/{run_id}", response_model_exclude_none=True)
async def get_run(
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Outdated
run_id: int,
expdb: Annotated[AsyncConnection, Depends(expdb_connection)],
userdb: Annotated[AsyncConnection, Depends(userdb_connection)],
) -> Run:
"""Get full metadata for a run by ID.

No authentication or visibility check is performed — all runs are
publicly accessible.
"""
# Core run record — all other data depends on uploader, setup, and task_id.
run = await database.runs.get(run_id, expdb)
if run is None:
msg = f"Run {run_id} not found."
# Reuse RunNotFoundError and pass code=236 at the call site for
# backward compat with the PHP GET /run/{id} error code
raise RunNotFoundError(msg, code=236)

# Evaluation engine IDs come from config.toml [run] so they can be
# extended when a new evaluation engine is deployed, without code changes.
engine_ids: list[int] = config.load_run_configuration().get("evaluation_engine_ids", [1])

# Fetch all independent data concurrently.
(
uploader_name,
tags,
input_data_rows,
output_file_rows,
evaluation_rows,
task_type,
task_evaluation_measure,
setup,
parameter_rows,
) = cast(
"tuple[str | None, list[str], list[Row], list[Row], list[Row], str | None, str"
"| None, Row | None, list[Row]]",
await asyncio.gather(
database.runs.get_uploader_name(run.uploader, userdb),
database.runs.get_tags(run_id, expdb),
database.runs.get_input_data(run_id, expdb),
database.runs.get_output_files(run_id, expdb),
database.runs.get_evaluations(run_id, expdb, evaluation_engine_ids=engine_ids),
database.runs.get_task_type(run.task_id, expdb),
database.runs.get_task_evaluation_measure(run.task_id, expdb),
database.setups.get(run.setup, expdb),
database.setups.get_parameters(run.setup, expdb),
Comment on lines +104 to +113
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

fd -t f "runs.py" | head -20

Repository: openml/server-api

Length of output: 128


🏁 Script executed:

wc -l src/routers/openml/runs.py

Repository: openml/server-api

Length of output: 91


🏁 Script executed:

sed -n '90,120p' src/routers/openml/runs.py

Repository: openml/server-api

Length of output: 1304


🏁 Script executed:

# Check the full function context and initialization
sed -n '70,170p' src/routers/openml/runs.py

Repository: openml/server-api

Length of output: 3129


🌐 Web query:

SQLAlchemy AsyncConnection concurrent tasks safety async/await

💡 Result:

No, a single AsyncConnection instance is not safe to share across concurrent asyncio tasks. Like AsyncSession, AsyncConnection is stateful (e.g., holds a specific database connection and transaction state from the pool) and designed for use within a single task/coroutine. Sharing it across concurrent tasks (e.g., via asyncio.gather) leads to undefined behavior, potential errors detecting concurrent access (especially in SQLAlchemy 2.0+), and race conditions. The AsyncEngine, however, is safe to share across concurrent tasks. It manages an asyncio-compatible connection pool (using asyncio.Queue) that allows multiple tasks to concurrently acquire their own AsyncConnection instances via await engine.connect. Each task should create and use its own AsyncConnection within a context manager: async with engine.connect as conn: result = await conn.execute(stmt) This pattern supports high concurrency without blocking the event loop, as the pool handles connection checkout/checkin asynchronously. Official docs emphasize separate AsyncSession per task but the same principle applies to core AsyncConnection usage. GitHub discussions confirm sessions/connections cannot be shared concurrently, recommending per-task creation. Sync Connection is thread-safe when using Engine.pool, but async follows task-local pattern. Example for concurrent tasks: async def task(engine): async with engine.connect as conn: await conn.execute(...) async def main: await asyncio.gather(*(task(engine) for _ in range(10)))

Citations:


🏁 Script executed:

# Check if AsyncSession is used anywhere in the codebase
rg "AsyncSession" src/ --type py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check database module to understand connection/session setup
fd -t f "database" -type d | head -5

Repository: openml/server-api

Length of output: 231


🏁 Script executed:

# Look at database/__init__.py or similar for connection patterns
ls -la src/database/ | head -20

Repository: openml/server-api

Length of output: 828


🏁 Script executed:

# Search for other asyncio.gather patterns in the codebase
rg "asyncio\.gather" src/ -A 5 -B 2

Repository: openml/server-api

Length of output: 6116


🏁 Script executed:

# Check how AsyncConnection is created/provided
rg "AsyncConnection" src/ --type py -B 2 -A 2

Repository: openml/server-api

Length of output: 33637


🏁 Script executed:

# Check the database module initialization
head -50 src/database/__init__.py

Repository: openml/server-api

Length of output: 109


🏁 Script executed:

# Look at how expdb is obtained in the router
grep -n "expdb" src/routers/openml/runs.py | head -20

Repository: openml/server-api

Length of output: 1193


Don't run concurrent queries on the same AsyncConnection.

Eight of the nine asyncio.gather tasks share expdb, causing overlapping statements on one SQLAlchemy connection. AsyncConnection is stateful and designed for sequential use only. Sharing it across concurrent tasks causes undefined behavior and race conditions. Per SQLAlchemy docs: https://docs.sqlalchemy.org/en/20/orm/session_basics.html#is-the-session-thread-safe-is-asyncsession-safe-to-share-in-concurrent-tasks

Use sequential queries on the shared connection, or acquire separate connections per concurrent task from the engine.

Sequential execution for shared connection
-        await asyncio.gather(
-            database.runs.get_uploader_name(run.uploader, userdb),
-            database.runs.get_tags(run_id, expdb),
-            database.runs.get_input_data(run_id, expdb),
-            database.runs.get_output_files(run_id, expdb),
-            database.runs.get_evaluations(run_id, expdb, evaluation_engine_ids=engine_ids),
-            database.runs.get_task_type(run.task_id, expdb),
-            database.runs.get_task_evaluation_measure(run.task_id, expdb),
-            database.setups.get(run.setup, expdb),
-            database.setups.get_parameters(run.setup, expdb),
-        ),
+        (
+            await database.runs.get_uploader_name(run.uploader, userdb),
+            await database.runs.get_tags(run_id, expdb),
+            await database.runs.get_input_data(run_id, expdb),
+            await database.runs.get_output_files(run_id, expdb),
+            await database.runs.get_evaluations(run_id, expdb, evaluation_engine_ids=engine_ids),
+            await database.runs.get_task_type(run.task_id, expdb),
+            await database.runs.get_task_evaluation_measure(run.task_id, expdb),
+            await database.setups.get(run.setup, expdb),
+            await database.setups.get_parameters(run.setup, expdb),
+        ),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 100 - 109, The asyncio.gather is
launching multiple database.* calls that all share the same AsyncConnection
object (expdb), which is unsafe; change the code so calls that use the shared
expdb run sequentially or each acquires its own connection from the engine.
Concretely, replace the concurrent gather of database.runs.get_uploader_name,
get_tags, get_input_data, get_output_files, get_evaluations (with
evaluation_engine_ids=engine_ids), get_task_type, get_task_evaluation_measure,
database.setups.get, and database.setups.get_parameters — all currently passed
the shared expdb — with either (a) awaiting them one-by-one using await
database.runs.get_tags(..., expdb) etc., or (b) modify the call sites to obtain
a fresh connection/transaction for each concurrent task (e.g., acquire a new
async connection from the engine and pass that connection instead of expdb) so
no two coroutines share the same AsyncConnection.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory valid point but for our usecase all operations are read so no such issue arises is what i feel and it is consistent with codebase (https://github.com/openml/server-api/blob/main/src/routers/openml/datasets.py) for reference.

another point to note is that we can use connection pool(sourcery-ai suggested) but i feel it is too much overhead and complex for our usecase.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saathviksheerla, thank you for the context and the pointer to datasets.py.

A few clarifications to sharpen the discussion:

  1. The SQLAlchemy warning covers AsyncConnection too, not just AsyncSession. The asyncpg database connection itself, acquired from SQLAlchemy's pool, is not concurrency safe — using it in two concurrent awaitables produces an error. The same principle extends to AsyncConnection since it wraps the underlying DBAPI connection.

  2. Read vs. write does not make concurrent use safe. SQLAlchemy 2.0 introduced proactive detection of concurrent methods invoked on a single Session/AsyncSession — these typically occur when a single instance is shared among concurrent tasks (e.g., asyncio.gather()). Without this warning system, the pattern still produces invalid state within the objects, including driver-level errors on the database connections themselves. The state machine in the connection object does not distinguish reads from writes.

  3. The practical risk depends on the driver. aiomysql (which this project likely uses for MySQL) may be more lenient in practice than asyncpg, which is why you may not be seeing failures locally. That said, this is relying on undocumented driver tolerance, not a supported guarantee.

  4. Codebase consistency is a fair point. If datasets.py uses the same asyncio.gather pattern with a shared AsyncConnection and it has been stable in production, then changing only this endpoint in isolation would be inconsistent. If this is an accepted trade-off across the codebase, that's a maintainer call to make uniformly.

Given that this is a consistent established pattern in the codebase and not an isolated deviation, I'll defer to your judgment here. You may want to track this as a future codebase-wide improvement if you ever see InterfaceError: another operation is in progress in production logs.


🧠 Learnings used
Learnt from: saathviksheerla
Repo: openml/server-api PR: 267
File: src/routers/openml/runs.py:0-0
Timestamp: 2026-03-08T07:45:07.969Z
Learning: In the openml/server-api Python codebase, non-boolean ENUM values stored as strings coming from the database should be treated as their literal string values, not coerced via truthiness checks. Ensure that fields like row.selected (with MySQL ENUM('true','false')) are passed through as the exact string (e.g., 'true' or 'false') and validated by Pydantic using Literal['true', 'false'] at the schema boundary. Do not apply a ternary like 'true' if row.selected else 'false'. Review places where the value is transformed before response: rely on the string value and let Pydantic enforce the literal type, and adjust any serialization if necessary to avoid implicit boolean conversion.

Learnt from: saathviksheerla
Repo: openml/server-api PR: 267
File: src/routers/openml/runs.py:45-51
Timestamp: 2026-03-08T07:59:51.515Z
Learning: In Python code using SQLAlchemy, VARCHAR/text columns are returned as Python strings. Avoid wrapping such values with str(...) when constructing objects (e.g., TraceIteration). Treat string-type columns as strings and pass them through directly unless a transformation is actually required. This guidance applies to similar string columns across Python files in the repository (and can be considered for enum-like values that are exposed as strings).

Learnt from: saathviksheerla
Repo: openml/server-api PR: 277
File: src/routers/openml/tasks.py:218-233
Timestamp: 2026-03-17T12:52:03.020Z
Learning: In the openml/server-api repo, the GET+POST list endpoints reuse a single handler with both router.post and router.get and declare all filter parameters with Body(...). Do not flag Body(...) parameters on GET handlers in this repository. This pattern applies to files under src/routers/openml/ (e.g., list tasks in tasks.py and list datasets in datasets.py).

Copy link
Copy Markdown
Contributor

@PGijsbers PGijsbers Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had completely missed that the AsyncConnection is not safe to be used concurrently in multiple tasks at a time. I propose we ignore that issue in this PR, and update the code base to make appropriate use of sequential calls or separate connections, as is appropriate, in a separate PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I do agree with you @PGijsbers on AsyncConnection to not use concurrently.
sure, I also think it is better to ignore this issue in this PR and tackle this altogether as a new issue, I will be happy to contribute there :)

),
)

# Flow is fetched after the gather because it requires setup.implementation_id.
# flows.get() selects fullName AS full_name for reliable case-insensitive access.
flow = await database.flows.get(setup.implementation_id, expdb) if setup else None

# Build parameter_setting list from the denormalised parameter rows
# returned by database.setups.get_parameters (which already JOINs input + implementation).
parameter_settings = [
ParameterSetting(
name=p["name"],
value=p["value"],
component=p["flow_id"], # implementation_id of the sub-flow owning this param
)
for p in parameter_rows
]

input_datasets = [
InputDataset(did=row.did, name=row.name, url=row.url) for row in input_data_rows
]

# runfile.field is the file label (e.g. "description", "predictions")
output_files = [OutputFile(file_id=row.file_id, name=row.field) for row in output_file_rows]

evaluations = [
EvaluationScore(
name=row.name,
# Whole-number floats (e.g. counts) are converted to int to match PHP's
# integer representation. e.g. 253.0 → 253, 0.0 → 0.
value=int(row.value)
if isinstance(row.value, float) and row.value.is_integer()
else row.value,
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Outdated
array_data=row.array_data,
)
for row in evaluation_rows
]

# Normalise task_evaluation_measure: empty string → None so the field is
# excluded entirely by response_model_exclude_none=True (matches PHP behaviour
# of returning "" but we opt to omit rather than return an empty string).
normalised_measure = task_evaluation_measure or None

# error_message is NULL in the DB when the run has no error.
# The PHP response returns an empty array [] in that case.
error_messages = [run.error_message] if run.error_message else []

return Run(
run_id=run_id,
uploader=run.uploader,
uploader_name=uploader_name,
task_id=run.task_id,
task_type=task_type,
task_evaluation_measure=normalised_measure,
flow_id=setup.implementation_id if setup else 0,
flow_name=flow.full_name if flow else None,
setup_id=run.setup,
setup_string=setup.setup_string if setup else None,
parameter_setting=parameter_settings,
error_message=error_messages,
tag=tags,
# Preserve PHP envelope structure for backward compat
input_data={"dataset": input_datasets},
output_data={"file": output_files, "evaluation": evaluations},
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Loading
Loading