From ef661bc74758939ffb36f2c9b8751cba8d966c34 Mon Sep 17 00:00:00 2001 From: Alexander Akhmetov Date: Mon, 22 Dec 2025 18:35:29 +0100 Subject: [PATCH] Improve worker resilience --- docs/source/changelog.rst | 2 ++ telegram/client.py | 9 +++++- telegram/worker.py | 8 ++++-- tests/test_telegram_methods.py | 51 ++++++++++++++++++++++++++++++++-- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 32dcd9c9..64abfc4e 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -8,6 +8,8 @@ Changelog - Python versions 3.7 and 3.8 are no longer supported. - tdlib 1.8.31. +- Fix: Handle errors during updates processing +- Fix: Handle queue full errors [0.18.0] - 2023-03-13 diff --git a/telegram/client.py b/telegram/client.py index c2161f67..d98d8b7d 100644 --- a/telegram/client.py +++ b/telegram/client.py @@ -559,7 +559,14 @@ def _run_handlers(self, update: Dict[Any, Any]) -> None: update_type: str = update.get("@type", "unknown") for handler in self._update_handlers[update_type]: - self._workers_queue.put((handler, update), timeout=self._queue_put_timeout) + try: + self._workers_queue.put((handler, update), timeout=self._queue_put_timeout) + except queue.Full: + logger.error( + "Queue is full, update %s dropped for handler %s", + update_type, + handler.__name__ if hasattr(handler, "__name__") else handler, + ) def remove_update_handler(self, handler_type: str, func: Callable) -> None: """ diff --git a/telegram/worker.py b/telegram/worker.py index 2d102040..5ac6e06d 100644 --- a/telegram/worker.py +++ b/telegram/worker.py @@ -41,8 +41,12 @@ def _run_thread(self) -> None: except Empty: continue - handler(update) - self._queue.task_done() + try: + handler(update) + except Exception: + logger.exception("Handler raised an exception") + finally: + self._queue.task_done() def stop(self) -> None: self._is_enabled = False diff --git a/tests/test_telegram_methods.py b/tests/test_telegram_methods.py index ddbd347b..10ad10a3 100644 --- a/tests/test_telegram_methods.py +++ b/tests/test_telegram_methods.py @@ -1,4 +1,6 @@ import pytest +import queue +import time from unittest.mock import patch @@ -6,6 +8,7 @@ from telegram.utils import AsyncResult from telegram.client import Telegram, MESSAGE_HANDLER_TYPE, AuthorizationState from telegram.text import Spoiler +from telegram.worker import SimpleWorker API_ID = 1 API_HASH = "hash" @@ -89,8 +92,11 @@ def test_parse_text_entities(self, telegram): telegram._tdjson.send.assert_called_once_with(exp_data) def test_send_phone_number_or_bot_token(self, telegram): - # check that the dunction calls _send_phone_number or _send_bot_token - with patch.object(telegram, "_send_phone_number"), patch.object(telegram, "_send_bot_token"): + # check that the function calls _send_phone_number or _send_bot_token + with ( + patch.object(telegram, "_send_phone_number"), + patch.object(telegram, "_send_bot_token"), + ): telegram.phone = "123" telegram.bot_token = None @@ -478,3 +484,44 @@ def _get_async_result(data, request_id=None): assert state == telegram.authorization_state == AuthorizationState.READY assert telegram._tdjson.send.call_count == 0 + + +class TestWorker: + def test_worker_continues_after_handler_exception(self): + """Handler exceptions should not kill the worker thread and task_done must be called""" + q = queue.Queue() + worker = SimpleWorker(q) + worker.run() + + results = [] + + def failing_handler(update): + raise ValueError("Handler failed") + + def working_handler(update): + results.append(update) + + # Put two items: one with a failing handler, one with a working handler + q.put((failing_handler, {"id": 1})) + q.put((working_handler, {"id": 2})) + + # Give the worker time to process both items. + # Can't use join when the test fails. + time.sleep(1) + + worker.stop() + + assert results == [{"id": 2}] + + def test_run_handlers_continues_on_queue_full(self, telegram): + """queue.Full should not crash the listener""" + + def my_handler(): + pass + + telegram.add_message_handler(my_handler) + + # Mock the queue to always raise queue.Full + with patch.object(telegram._workers_queue, "put", side_effect=queue.Full): + # This should not raise an exception + telegram._run_handlers({"@type": MESSAGE_HANDLER_TYPE})