diff --git a/bin/capture_manager.py b/bin/capture_manager.py index e6f0523..75946cb 100755 --- a/bin/capture_manager.py +++ b/bin/capture_manager.py @@ -25,6 +25,17 @@ def __init__(self, loglevel: int | None=None) -> None: self.captures: set[Task[None]] = set() self.lacus = Lacus() + def _clear_ongoing_on_startup(self) -> None: + # At process start, self.captures is empty — no task can be running. + # Any UUID left in lacus:ongoing is a zombie from a previous crash. + # Use clear_capture() per UUID so each gets a proper error result + # and capture_settings are cleaned up. + ongoing = self.lacus.monitoring.get_ongoing_captures() + if ongoing: + self.logger.warning(f'Startup cleanup: clearing {len(ongoing)} zombie capture(s) from lacus:ongoing') + for uuid, _ in ongoing: + self.lacus.core.clear_capture(uuid, 'Cleared on startup: previous process died.') + async def clear_dead_captures(self) -> None: ongoing = {capture.get_name(): capture for capture in self.captures} max_capture_time = get_config('generic', 'max_capture_time') @@ -47,6 +58,12 @@ async def clear_dead_captures(self) -> None: if not capture.done(): self.logger.error(f'{expected_uuid} is not done after canceling, trying {max_cancel} more times.') await asyncio.sleep(1) + # All cancel attempts exhausted but the task is still stuck. + # Free the Redis slot so new captures aren't blocked. + if not capture.done(): + self.logger.error(f'{expected_uuid} could not be canceled after 5 attempts, force-clearing from Redis.') + self.lacus.core.clear_capture(expected_uuid, 'Force-cleared: task could not be canceled.') + self.captures.discard(capture) async def _to_run_forever_async(self) -> None: @@ -82,6 +99,10 @@ def main() -> None: loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(p.stop_async())) try: + # Flush stale captures before the event loop starts. + # Valkey persists across container restarts, so lacus:ongoing + # may contain UUIDs from a process that was killed mid-capture. + p._clear_ongoing_on_startup() loop.run_until_complete(p.run_async(sleep_in_sec=1)) finally: loop.close()