Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions app/modules/health/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,28 @@ async def internal_drain_status(request: Request) -> HealthCheckResponse:

import app.core.shutdown as shutdown_state

return HealthCheckResponse(
status="ok",
checks={
"draining": str(shutdown_state.is_draining()).lower(),
"bridge_drain_active": str(shutdown_state.is_bridge_drain_active()).lower(),
"in_flight": str(shutdown_state.get_in_flight()),
},
)
checks = {
"draining": str(shutdown_state.is_draining()).lower(),
"bridge_drain_active": str(shutdown_state.is_bridge_drain_active()).lower(),
"in_flight": str(shutdown_state.get_in_flight()),
}

app = getattr(request, "app", None)
app_state = getattr(app, "state", None)
proxy_service = getattr(app_state, "proxy_service", None)
if proxy_service is not None and hasattr(proxy_service, "http_bridge_activity_snapshot_nowait"):
try:
bridge_activity = proxy_service.http_bridge_activity_snapshot_nowait()
checks.update(
{
key: str(value).lower() if isinstance(value, bool) else str(value)
for key, value in bridge_activity.items()
}
)
except Exception as exc:
checks["http_bridge_activity_error"] = type(exc).__name__

return HealthCheckResponse(status="ok", checks=checks)


def _bridge_readiness_failure_detail(bridge_ring: BridgeRingInfo) -> str | None:
Expand Down
104 changes: 104 additions & 0 deletions app/modules/proxy/_service/http_bridge/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@
)
_HTTP_BRIDGE_BACKGROUND_CLOSE_TIMEOUT_SECONDS = 5.0
_HTTP_BRIDGE_BACKGROUND_CLEANUP_WARN_THRESHOLD = 100
_HTTP_BRIDGE_INFLIGHT_STARTED_AT_ATTR = "_codex_lb_started_at"
_HTTP_BRIDGE_STALE_INFLIGHT_MIN_SECONDS = 120.0
_HTTP_BRIDGE_STALE_INFLIGHT_TIMEOUT_MULTIPLIER = 6.0


class _HTTPBridgeMixin(
Expand Down Expand Up @@ -263,6 +266,102 @@ def _http_bridge_pending_count_nowait(
finally:
session.pending_lock.release()

def http_bridge_activity_snapshot_nowait(self) -> dict[str, int | bool]:
inflight_cleanup = self._cleanup_http_bridge_inflight_sessions_nowait()
Comment thread
zvladru marked this conversation as resolved.
live_sessions = 0
pending_or_queued_requests = 0
pending_unknown_sessions = 0

for session in list(self._http_bridge_sessions.values()):
if session.closed:
continue
live_sessions += 1
pending_count = self._http_bridge_pending_count_nowait(session, context="drain_status")
if pending_count is None:
pending_unknown_sessions += 1
else:
pending_or_queued_requests += max(0, pending_count)

inflight_session_creates = len(self._http_bridge_inflight_sessions)
active_cleanup_tasks = sum(1 for task in self._background_cleanup_tasks if not task.done())
restart_blocking = (
pending_or_queued_requests > 0 or pending_unknown_sessions > 0 or inflight_session_creates > 0
)
return {
"http_bridge_live_sessions": live_sessions,
"http_bridge_pending_or_queued_requests": pending_or_queued_requests,
"http_bridge_pending_unknown_sessions": pending_unknown_sessions,
"http_bridge_inflight_session_creates": inflight_session_creates,
"http_bridge_inflight_session_create_oldest_age_seconds": inflight_cleanup["oldest_age_seconds"],
"http_bridge_stale_inflight_session_creates": inflight_cleanup["stale"],
"http_bridge_cleaned_inflight_session_creates": inflight_cleanup["cleaned"],
"http_bridge_background_cleanup_tasks": active_cleanup_tasks,
"http_bridge_restart_blocking": restart_blocking,
}

def _cleanup_http_bridge_inflight_sessions_nowait(self) -> dict[str, int]:
now = _service_time().monotonic()
stale_after_seconds = self._http_bridge_stale_inflight_seconds()
cleaned = 0
stale = 0
oldest_age_seconds = 0
for key, future in list(self._http_bridge_inflight_sessions.items()):
current_future = self._http_bridge_inflight_sessions.get(key)
if current_future is not future:
continue
started_at = getattr(future, _HTTP_BRIDGE_INFLIGHT_STARTED_AT_ATTR, None)
age_seconds = max(0.0, now - started_at) if isinstance(started_at, (int, float)) else 0.0
oldest_age_seconds = max(oldest_age_seconds, int(age_seconds))
cleanup_reason: str | None = None
cleanup_exc: BaseException | None = None
if future.done():
cleanup_reason = "done"
elif isinstance(started_at, (int, float)) and age_seconds >= stale_after_seconds:
stale += 1
cleanup_reason = "stale"
cleanup_exc = _http_bridge_startup_wait_timeout_error(
"http_bridge_inflight_session_stale",
code="capacity_exhausted_active_sessions",
)
if cleanup_reason is None:
continue
self._http_bridge_inflight_sessions.pop(key, None)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep inflight cleanup under the registry lock

When /internal/drain/status runs while _get_or_create_http_bridge_session is suspended inside _http_bridge_lock on owner/ring/durable awaits, this cleanup can still pop and set exceptions on _http_bridge_inflight_sessions without taking that lock. In that overlap a request that is about to read the inflight future under the lock can instead see it disappear, skipping the terminal inflight error or starting a duplicate bridge create; make this cleanup try-acquire/use the same registry lock, or skip mutation when it cannot.

Useful? React with 👍 / 👎.

cleaned += 1
if cleanup_exc is not None and not future.done():
future.set_exception(cleanup_exc)
future.exception()
elif future.done() and not future.cancelled():
try:
future.exception()
except Exception:
pass
logger.warning(
"http_bridge_inflight_session_create_cleanup reason=%s bridge_kind=%s bridge_key=%s"
" age_seconds=%d stale_after_seconds=%d done=%s cancelled=%s",
cleanup_reason,
key.affinity_kind,
_hash_identifier(key.affinity_key),
int(age_seconds),
int(stale_after_seconds),
future.done(),
future.cancelled(),
)
return {
"cleaned": cleaned,
"stale": stale,
"oldest_age_seconds": oldest_age_seconds,
}

def _http_bridge_stale_inflight_seconds(self) -> float:
try:
admission_timeout = _proxy_admission_wait_timeout_seconds()
except Exception:
admission_timeout = 10.0
return max(
_HTTP_BRIDGE_STALE_INFLIGHT_MIN_SECONDS,
admission_timeout * _HTTP_BRIDGE_STALE_INFLIGHT_TIMEOUT_MULTIPLIER,
)

async def _close_http_bridge_session_bounded(
self,
session: "_HTTPBridgeSession",
Expand Down Expand Up @@ -1258,6 +1357,11 @@ async def _get_or_create_http_bridge_session(
)
else:
inflight_future = asyncio.get_running_loop().create_future()
setattr(
inflight_future,
_HTTP_BRIDGE_INFLIGHT_STARTED_AT_ATTR,
_service_time().monotonic(),
)
self._http_bridge_inflight_sessions[key] = inflight_future
owns_creation = True

Expand Down
34 changes: 34 additions & 0 deletions tests/unit/test_health_probes.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,40 @@ async def test_internal_drain_status_reports_shutdown_state():
}


@pytest.mark.asyncio
async def test_internal_drain_status_reports_bridge_activity():
from app.modules.health.api import internal_drain_status

proxy_service = SimpleNamespace(
http_bridge_activity_snapshot_nowait=MagicMock(
return_value={
"http_bridge_live_sessions": 2,
"http_bridge_pending_or_queued_requests": 1,
"http_bridge_pending_unknown_sessions": 0,
"http_bridge_inflight_session_creates": 1,
"http_bridge_restart_blocking": True,
}
)
)
request = SimpleNamespace(
client=SimpleNamespace(host="127.0.0.1"),
app=SimpleNamespace(state=SimpleNamespace(proxy_service=proxy_service)),
)

with (
patch("app.core.shutdown.is_draining", return_value=False),
patch("app.core.shutdown.is_bridge_drain_active", return_value=False),
patch("app.core.shutdown.get_in_flight", return_value=0),
):
response = await internal_drain_status(cast(Any, request))

assert response.checks is not None
assert response.checks["http_bridge_live_sessions"] == "2"
assert response.checks["http_bridge_pending_or_queued_requests"] == "1"
assert response.checks["http_bridge_inflight_session_creates"] == "1"
assert response.checks["http_bridge_restart_blocking"] == "true"


@pytest.mark.asyncio
async def test_internal_drain_status_rejects_non_loopback_clients():
from app.modules.health.api import internal_drain_status
Expand Down
66 changes: 66 additions & 0 deletions tests/unit/test_proxy_http_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,72 @@ def _make_bridge_session(
)


@pytest.mark.asyncio
async def test_http_bridge_activity_snapshot_counts_pending_and_inflight_sessions():
service = proxy_service.ProxyService(cast(Any, SimpleNamespace()))
request_state = proxy_service._WebSocketRequestState(
request_id="req-drain-status",
model="gpt-5.5",
service_tier=None,
reasoning_effort=None,
api_key_reservation=None,
started_at=1.0,
response_id=None,
awaiting_response_created=True,
event_queue=None,
transport="http",
skip_request_log=True,
)
session = _make_bridge_session(
pending_requests=deque([request_state]),
queued_request_count=2,
)
service._http_bridge_sessions[session.key] = session
service._http_bridge_inflight_sessions[
proxy_service._HTTPBridgeSessionKey("session_header", "inflight-drain-status", None)
] = asyncio.Future()

snapshot = service.http_bridge_activity_snapshot_nowait()

assert snapshot == {
"http_bridge_live_sessions": 1,
"http_bridge_pending_or_queued_requests": 2,
"http_bridge_pending_unknown_sessions": 0,
"http_bridge_inflight_session_creates": 1,
"http_bridge_inflight_session_create_oldest_age_seconds": 0,
"http_bridge_stale_inflight_session_creates": 0,
"http_bridge_cleaned_inflight_session_creates": 0,
"http_bridge_background_cleanup_tasks": 0,
"http_bridge_restart_blocking": True,
}


@pytest.mark.asyncio
async def test_http_bridge_activity_snapshot_cleans_stale_inflight_session(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
service = proxy_service.ProxyService(cast(Any, SimpleNamespace()))
key = proxy_service._HTTPBridgeSessionKey("session_header", "stale-inflight-drain-status", None)
inflight_future: asyncio.Future[proxy_service._HTTPBridgeSession] = asyncio.get_running_loop().create_future()
setattr(inflight_future, "_codex_lb_started_at", -1000.0)
service._http_bridge_inflight_sessions[key] = inflight_future

monkeypatch.setattr(proxy_service, "_proxy_admission_wait_timeout_seconds", lambda settings=None: 0.001)

with caplog.at_level(logging.WARNING, logger="app.modules.proxy.service"):
snapshot = service.http_bridge_activity_snapshot_nowait()

assert key not in service._http_bridge_inflight_sessions
assert snapshot["http_bridge_inflight_session_creates"] == 0
assert snapshot["http_bridge_stale_inflight_session_creates"] == 1
assert snapshot["http_bridge_cleaned_inflight_session_creates"] == 1
assert snapshot["http_bridge_restart_blocking"] is False
assert "http_bridge_inflight_session_create_cleanup" in caplog.text
with pytest.raises(ProxyResponseError):
await inflight_future


async def _wait_for_close_await(close_session: AsyncMock, session: proxy_service._HTTPBridgeSession) -> None:
for _ in range(10):
if any(call.args == (session,) for call in close_session.await_args_list):
Expand Down