diff --git a/app/modules/health/api.py b/app/modules/health/api.py index c76e1aa97..bd26d0f23 100644 --- a/app/modules/health/api.py +++ b/app/modules/health/api.py @@ -130,14 +130,28 @@ async def internal_drain_status(request: Request) -> HealthCheckResponse: import app.core.shutdown as shutdown_state - return HealthCheckResponse( - status="ok", - checks={ - "draining": str(shutdown_state.is_draining()).lower(), - "bridge_drain_active": str(shutdown_state.is_bridge_drain_active()).lower(), - "in_flight": str(shutdown_state.get_in_flight()), - }, - ) + checks = { + "draining": str(shutdown_state.is_draining()).lower(), + "bridge_drain_active": str(shutdown_state.is_bridge_drain_active()).lower(), + "in_flight": str(shutdown_state.get_in_flight()), + } + + app = getattr(request, "app", None) + app_state = getattr(app, "state", None) + proxy_service = getattr(app_state, "proxy_service", None) + if proxy_service is not None and hasattr(proxy_service, "http_bridge_activity_snapshot_nowait"): + try: + bridge_activity = proxy_service.http_bridge_activity_snapshot_nowait() + checks.update( + { + key: str(value).lower() if isinstance(value, bool) else str(value) + for key, value in bridge_activity.items() + } + ) + except Exception as exc: + checks["http_bridge_activity_error"] = type(exc).__name__ + + return HealthCheckResponse(status="ok", checks=checks) def _bridge_readiness_failure_detail(bridge_ring: BridgeRingInfo) -> str | None: diff --git a/app/modules/proxy/_service/http_bridge/mixin.py b/app/modules/proxy/_service/http_bridge/mixin.py index c9ed0151d..640a5c38d 100644 --- a/app/modules/proxy/_service/http_bridge/mixin.py +++ b/app/modules/proxy/_service/http_bridge/mixin.py @@ -217,6 +217,9 @@ ) _HTTP_BRIDGE_BACKGROUND_CLOSE_TIMEOUT_SECONDS = 5.0 _HTTP_BRIDGE_BACKGROUND_CLEANUP_WARN_THRESHOLD = 100 +_HTTP_BRIDGE_INFLIGHT_STARTED_AT_ATTR = "_codex_lb_started_at" +_HTTP_BRIDGE_STALE_INFLIGHT_MIN_SECONDS = 120.0 +_HTTP_BRIDGE_STALE_INFLIGHT_TIMEOUT_MULTIPLIER = 6.0 class _HTTPBridgeMixin( @@ -263,6 +266,102 @@ def _http_bridge_pending_count_nowait( finally: session.pending_lock.release() + def http_bridge_activity_snapshot_nowait(self) -> dict[str, int | bool]: + inflight_cleanup = self._cleanup_http_bridge_inflight_sessions_nowait() + live_sessions = 0 + pending_or_queued_requests = 0 + pending_unknown_sessions = 0 + + for session in list(self._http_bridge_sessions.values()): + if session.closed: + continue + live_sessions += 1 + pending_count = self._http_bridge_pending_count_nowait(session, context="drain_status") + if pending_count is None: + pending_unknown_sessions += 1 + else: + pending_or_queued_requests += max(0, pending_count) + + inflight_session_creates = len(self._http_bridge_inflight_sessions) + active_cleanup_tasks = sum(1 for task in self._background_cleanup_tasks if not task.done()) + restart_blocking = ( + pending_or_queued_requests > 0 or pending_unknown_sessions > 0 or inflight_session_creates > 0 + ) + return { + "http_bridge_live_sessions": live_sessions, + "http_bridge_pending_or_queued_requests": pending_or_queued_requests, + "http_bridge_pending_unknown_sessions": pending_unknown_sessions, + "http_bridge_inflight_session_creates": inflight_session_creates, + "http_bridge_inflight_session_create_oldest_age_seconds": inflight_cleanup["oldest_age_seconds"], + "http_bridge_stale_inflight_session_creates": inflight_cleanup["stale"], + "http_bridge_cleaned_inflight_session_creates": inflight_cleanup["cleaned"], + "http_bridge_background_cleanup_tasks": active_cleanup_tasks, + "http_bridge_restart_blocking": restart_blocking, + } + + def _cleanup_http_bridge_inflight_sessions_nowait(self) -> dict[str, int]: + now = _service_time().monotonic() + stale_after_seconds = self._http_bridge_stale_inflight_seconds() + cleaned = 0 + stale = 0 + oldest_age_seconds = 0 + for key, future in list(self._http_bridge_inflight_sessions.items()): + current_future = self._http_bridge_inflight_sessions.get(key) + if current_future is not future: + continue + started_at = getattr(future, _HTTP_BRIDGE_INFLIGHT_STARTED_AT_ATTR, None) + age_seconds = max(0.0, now - started_at) if isinstance(started_at, (int, float)) else 0.0 + oldest_age_seconds = max(oldest_age_seconds, int(age_seconds)) + cleanup_reason: str | None = None + cleanup_exc: BaseException | None = None + if future.done(): + cleanup_reason = "done" + elif isinstance(started_at, (int, float)) and age_seconds >= stale_after_seconds: + stale += 1 + cleanup_reason = "stale" + cleanup_exc = _http_bridge_startup_wait_timeout_error( + "http_bridge_inflight_session_stale", + code="capacity_exhausted_active_sessions", + ) + if cleanup_reason is None: + continue + self._http_bridge_inflight_sessions.pop(key, None) + cleaned += 1 + if cleanup_exc is not None and not future.done(): + future.set_exception(cleanup_exc) + future.exception() + elif future.done() and not future.cancelled(): + try: + future.exception() + except Exception: + pass + logger.warning( + "http_bridge_inflight_session_create_cleanup reason=%s bridge_kind=%s bridge_key=%s" + " age_seconds=%d stale_after_seconds=%d done=%s cancelled=%s", + cleanup_reason, + key.affinity_kind, + _hash_identifier(key.affinity_key), + int(age_seconds), + int(stale_after_seconds), + future.done(), + future.cancelled(), + ) + return { + "cleaned": cleaned, + "stale": stale, + "oldest_age_seconds": oldest_age_seconds, + } + + def _http_bridge_stale_inflight_seconds(self) -> float: + try: + admission_timeout = _proxy_admission_wait_timeout_seconds() + except Exception: + admission_timeout = 10.0 + return max( + _HTTP_BRIDGE_STALE_INFLIGHT_MIN_SECONDS, + admission_timeout * _HTTP_BRIDGE_STALE_INFLIGHT_TIMEOUT_MULTIPLIER, + ) + async def _close_http_bridge_session_bounded( self, session: "_HTTPBridgeSession", @@ -1258,6 +1357,11 @@ async def _get_or_create_http_bridge_session( ) else: inflight_future = asyncio.get_running_loop().create_future() + setattr( + inflight_future, + _HTTP_BRIDGE_INFLIGHT_STARTED_AT_ATTR, + _service_time().monotonic(), + ) self._http_bridge_inflight_sessions[key] = inflight_future owns_creation = True diff --git a/tests/unit/test_health_probes.py b/tests/unit/test_health_probes.py index 18fb4ed28..737994521 100644 --- a/tests/unit/test_health_probes.py +++ b/tests/unit/test_health_probes.py @@ -445,6 +445,40 @@ async def test_internal_drain_status_reports_shutdown_state(): } +@pytest.mark.asyncio +async def test_internal_drain_status_reports_bridge_activity(): + from app.modules.health.api import internal_drain_status + + proxy_service = SimpleNamespace( + http_bridge_activity_snapshot_nowait=MagicMock( + return_value={ + "http_bridge_live_sessions": 2, + "http_bridge_pending_or_queued_requests": 1, + "http_bridge_pending_unknown_sessions": 0, + "http_bridge_inflight_session_creates": 1, + "http_bridge_restart_blocking": True, + } + ) + ) + request = SimpleNamespace( + client=SimpleNamespace(host="127.0.0.1"), + app=SimpleNamespace(state=SimpleNamespace(proxy_service=proxy_service)), + ) + + with ( + patch("app.core.shutdown.is_draining", return_value=False), + patch("app.core.shutdown.is_bridge_drain_active", return_value=False), + patch("app.core.shutdown.get_in_flight", return_value=0), + ): + response = await internal_drain_status(cast(Any, request)) + + assert response.checks is not None + assert response.checks["http_bridge_live_sessions"] == "2" + assert response.checks["http_bridge_pending_or_queued_requests"] == "1" + assert response.checks["http_bridge_inflight_session_creates"] == "1" + assert response.checks["http_bridge_restart_blocking"] == "true" + + @pytest.mark.asyncio async def test_internal_drain_status_rejects_non_loopback_clients(): from app.modules.health.api import internal_drain_status diff --git a/tests/unit/test_proxy_http_bridge.py b/tests/unit/test_proxy_http_bridge.py index 120447c13..a5f34f867 100644 --- a/tests/unit/test_proxy_http_bridge.py +++ b/tests/unit/test_proxy_http_bridge.py @@ -63,6 +63,72 @@ def _make_bridge_session( ) +@pytest.mark.asyncio +async def test_http_bridge_activity_snapshot_counts_pending_and_inflight_sessions(): + service = proxy_service.ProxyService(cast(Any, SimpleNamespace())) + request_state = proxy_service._WebSocketRequestState( + request_id="req-drain-status", + model="gpt-5.5", + service_tier=None, + reasoning_effort=None, + api_key_reservation=None, + started_at=1.0, + response_id=None, + awaiting_response_created=True, + event_queue=None, + transport="http", + skip_request_log=True, + ) + session = _make_bridge_session( + pending_requests=deque([request_state]), + queued_request_count=2, + ) + service._http_bridge_sessions[session.key] = session + service._http_bridge_inflight_sessions[ + proxy_service._HTTPBridgeSessionKey("session_header", "inflight-drain-status", None) + ] = asyncio.Future() + + snapshot = service.http_bridge_activity_snapshot_nowait() + + assert snapshot == { + "http_bridge_live_sessions": 1, + "http_bridge_pending_or_queued_requests": 2, + "http_bridge_pending_unknown_sessions": 0, + "http_bridge_inflight_session_creates": 1, + "http_bridge_inflight_session_create_oldest_age_seconds": 0, + "http_bridge_stale_inflight_session_creates": 0, + "http_bridge_cleaned_inflight_session_creates": 0, + "http_bridge_background_cleanup_tasks": 0, + "http_bridge_restart_blocking": True, + } + + +@pytest.mark.asyncio +async def test_http_bridge_activity_snapshot_cleans_stale_inflight_session( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + service = proxy_service.ProxyService(cast(Any, SimpleNamespace())) + key = proxy_service._HTTPBridgeSessionKey("session_header", "stale-inflight-drain-status", None) + inflight_future: asyncio.Future[proxy_service._HTTPBridgeSession] = asyncio.get_running_loop().create_future() + setattr(inflight_future, "_codex_lb_started_at", -1000.0) + service._http_bridge_inflight_sessions[key] = inflight_future + + monkeypatch.setattr(proxy_service, "_proxy_admission_wait_timeout_seconds", lambda settings=None: 0.001) + + with caplog.at_level(logging.WARNING, logger="app.modules.proxy.service"): + snapshot = service.http_bridge_activity_snapshot_nowait() + + assert key not in service._http_bridge_inflight_sessions + assert snapshot["http_bridge_inflight_session_creates"] == 0 + assert snapshot["http_bridge_stale_inflight_session_creates"] == 1 + assert snapshot["http_bridge_cleaned_inflight_session_creates"] == 1 + assert snapshot["http_bridge_restart_blocking"] is False + assert "http_bridge_inflight_session_create_cleanup" in caplog.text + with pytest.raises(ProxyResponseError): + await inflight_future + + async def _wait_for_close_await(close_session: AsyncMock, session: proxy_service._HTTPBridgeSession) -> None: for _ in range(10): if any(call.args == (session,) for call in close_session.await_args_list):