Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,41 @@
from uagents.utils import get_logger, set_global_log_level


def _default_event_loop(
loop: asyncio.AbstractEventLoop | None,
) -> asyncio.AbstractEventLoop:
Comment thread
tejus3131 marked this conversation as resolved.
"""Resolve the event loop for Agent/Bureau construction.

Agent and Bureau need a loop during synchronous __init__ (ASGIServer, create_task).
Historically, asyncio.get_event_loop() implicitly created a main-thread loop when none
existed (Python 3.10-3.13). Python 3.14+ raises RuntimeError instead.

Order: honor explicit loop=, reuse the thread's loop if set and open, else create one
and register it via set_event_loop() so later get_event_loop() calls on this thread work.
If the thread default exists but is closed (e.g. after Agent.run()), create a new loop.
"""
if loop is not None:
return loop
try:
existing = asyncio.get_event_loop()
except RuntimeError:
existing = None
if existing is None or existing.is_closed():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
return new_loop
Comment thread
tejus3131 marked this conversation as resolved.
Comment thread
tejus3131 marked this conversation as resolved.
return existing
Comment thread
tejus3131 marked this conversation as resolved.


def _clear_thread_loop_default(closed_loop: asyncio.AbstractEventLoop) -> None:
"""Clear the thread default if it still references a loop we just closed."""
try:
if asyncio.get_event_loop() is closed_loop:
asyncio.set_event_loop(None)
except RuntimeError:
pass


async def _run_interval(
func: IntervalCallback,
logger: logging.Logger,
Expand Down Expand Up @@ -355,7 +390,8 @@ def __init__(
self._name = name
self._port = port or 8000

self._loop = loop or asyncio.get_event_loop_policy().get_event_loop()
# See _default_event_loop: required at init time, not only in run() (Python 3.14+).
self._loop = _default_event_loop(loop)

# initialize wallet and identity
self._initialize_wallet_and_identity(seed, name, wallet_key_derivation_index)
Expand Down Expand Up @@ -1338,6 +1374,7 @@ def run(self):
if not self._loop.is_closed():
self._loop.stop()
self._loop.close()
_clear_thread_loop_default(self._loop)

def get_message_protocol(
self, message_schema_digest
Expand Down Expand Up @@ -1572,7 +1609,8 @@ def __init__(
log_level (int | str): The logging level for the bureau.
shutdown_timeout (int): The timeout for shutting down the bureau.
"""
self._loop = loop or asyncio.get_event_loop_policy().get_event_loop()
# See _default_event_loop: same Python 3.14+ loop resolution as Agent.
self._loop = _default_event_loop(loop)
self._agents: list[Agent] = []
self._port = port or 8000
self._queries: dict[str, asyncio.Future] = {}
Expand Down Expand Up @@ -1836,3 +1874,4 @@ def run(self):
if not self._loop.is_closed():
self._loop.stop()
self._loop.close()
_clear_thread_loop_default(self._loop)
40 changes: 40 additions & 0 deletions python/tests/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# pylint: disable=protected-access
import asyncio
import unittest
from collections.abc import Callable

from uagents import Agent, Context, Model
from uagents.agent import _default_event_loop
Comment thread
tejus3131 marked this conversation as resolved.
from uagents.resolver import GlobalResolver
from uagents.types import RestHandlerDetails

Expand All @@ -28,6 +30,44 @@ class Response(Model):
)


class TestDefaultEventLoop(unittest.TestCase):
def setUp(self) -> None:
try:
self._previous_loop = asyncio.get_event_loop()
except RuntimeError:
self._previous_loop = None
self._test_loop: asyncio.AbstractEventLoop | None = None

def tearDown(self) -> None:
if self._test_loop is not None and not self._test_loop.is_closed():
self._test_loop.close()
if self._previous_loop is not None and not self._previous_loop.is_closed():
asyncio.set_event_loop(self._previous_loop)
else:
asyncio.set_event_loop(None)

def test_returns_explicit_loop(self) -> None:
explicit = asyncio.new_event_loop()
self.assertIs(_default_event_loop(explicit), explicit)
explicit.close()

def test_creates_loop_when_thread_has_none(self) -> None:
asyncio.set_event_loop(None)
result = _default_event_loop(None)
self._test_loop = result
self.assertFalse(result.is_closed())
self.assertIs(asyncio.get_event_loop(), result)

def test_replaces_closed_thread_loop(self) -> None:
closed = asyncio.new_event_loop()
closed.close()
asyncio.set_event_loop(closed)
result = _default_event_loop(None)
self._test_loop = result
self.assertFalse(result.is_closed())
self.assertIsNot(result, closed)


class TestAgent(unittest.TestCase):
def setUp(self) -> None:
self.agent = alice
Expand Down
10 changes: 7 additions & 3 deletions python/tests/test_bureau.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
# pylint: disable=protected-access
import unittest

from cosmpy.aerial.wallet import LocalWallet

from uagents import Agent, Bureau
from uagents.agent import _default_event_loop
Comment thread
tejus3131 marked this conversation as resolved.
from uagents.registration import (
AgentEndpoint,
BatchLedgerRegistrationPolicy,
Expand All @@ -22,8 +23,11 @@

class TestBureau(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
self.loop = asyncio.get_event_loop()
return super().setUp()
# Obtain a loop before super().setUp() so agents can use loop=self.loop.
# Use production helper (3.14-safe, replaces closed thread loops after run()).
# Do not use get_running_loop(): IsolatedAsyncioTestCase has not started yet.
self.loop = _default_event_loop(None)
super().setUp()
Comment thread
tejus3131 marked this conversation as resolved.

def test_bureau_updates_agents_no_ledger_batch(self):
alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url, loop=self.loop)
Expand Down
Loading