Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions src/gaia/ui/routers/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,16 @@ async def send_message(
sid = request.session_id
session_lock = session_locks.setdefault(sid, asyncio.Lock())

# Acquire session lock — if a previous request is stuck (hung LLM
# connection, crashed stream), force-release and proceed rather than
# leaving the user permanently stuck with "request already in progress".
try:
await asyncio.wait_for(session_lock.acquire(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning(
"Force-releasing stuck session lock for %s "
"(previous request likely hung)",
sid,
# Reject overlapping turns for the same session. Force-releasing an
# asyncio.Lock held by another coroutine is unsafe because the lock
# has no ownership tracking.
if session_lock.locked():
raise HTTPException(
status_code=409,
detail="A chat request is already in progress for this session. "
"Please wait for it to finish.",
)
try:
session_lock.release()
except RuntimeError:
pass # Lock wasn't held — race condition, safe to ignore
await session_lock.acquire()
await session_lock.acquire()

# ── Global concurrency gate ──────────────────────────────────────
# Queue rather than immediately reject: wait up to 60 s for a slot.
Expand Down
17 changes: 10 additions & 7 deletions tests/unit/chat/ui/test_chat_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ def session_id(client):
return resp.json()["id"]


class TestSessionLockForceRelease:
"""Tests for per-session lock force-release on stuck requests."""
class TestSessionLockConflict:
"""Tests for same-session conflict handling."""

def test_concurrent_request_force_releases_stuck_lock(self, app, session_id):
"""When a session lock is stuck, a second request force-releases it and proceeds."""
def test_concurrent_request_returns_409_if_session_lock_is_held(
self, app, session_id
):
"""When a session lock is held, a second request should get 409."""
# Pre-acquire the session lock to simulate a stuck request
lock = asyncio.Lock()

Expand All @@ -63,14 +65,15 @@ async def _hold_lock():
"/api/chat/send",
json={
"session_id": session_id,
"message": "should succeed after force-release",
"message": "should conflict while another turn is active",
"stream": False,
},
)
# Should succeed (200) instead of deadlocking with 409
assert resp.status_code == 200
assert resp.status_code == 409
assert "already in progress" in resp.json()["detail"]

# Cleanup
lock.release()
loop.close()

def test_different_sessions_not_blocked(self, client, db):
Expand Down
Loading