Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b60eab0
merge
carlos-irreverentlabs Jan 16, 2026
644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs Jan 22, 2026
218f7aa
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 3, 2026
90da389
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 10, 2026
842e9b3
PSv2: Use connection pooling and retries for NATS
carlosgjs Feb 11, 2026
227a8db
Refactor and fix nats tests
carlosgjs Feb 11, 2026
2acf620
Tighten formatting
carlosgjs Feb 11, 2026
0632ce0
format
carlosgjs Feb 11, 2026
c5f8106
CR feedback
carlosgjs Feb 11, 2026
8805dbe
Apply suggestions from code review
carlosgjs Feb 11, 2026
8618d3c
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 13, 2026
c384199
refactor: simplify NATS connection handling — keep retry decorator, d…
mihow Feb 12, 2026
98a17f1
Merge branch 'main' into carlosg/natsconn
carlosgjs Feb 13, 2026
cf42506
revert: restore NATS connection pool — avoid per-operation connection…
mihow Feb 13, 2026
dc798ea
refactor: add switchable NATS connection strategies
mihow Feb 13, 2026
4d66c07
refactor: simplify NATS connection module — pool-only, archive original
mihow Feb 13, 2026
41bbeb3
docs: clarify where connection pool provides reuse vs. single-use
mihow Feb 13, 2026
ead53d1
fix: use `from None` to suppress noisy exception chain in _get_pool
mihow Feb 13, 2026
9737301
docs: update AGENTS.md test commands to use docker-compose.ci.yml
mihow Feb 13, 2026
fa0f84b
fix: correct mock setup in NATS task tests to match plain instantiation
mihow Feb 14, 2026
c7b2014
fix: address PR review feedback for NATS connection module
mihow Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None:
try:

async def ack_task():
async with TaskQueueManager() as manager:
return await manager.acknowledge_task(reply_subject)
manager = TaskQueueManager()
return await manager.acknowledge_task(reply_subject)

ack_success = async_to_sync(ack_task)()

Expand Down
10 changes: 5 additions & 5 deletions ami/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ def tasks(self, request, pk=None):

async def get_tasks():
tasks = []
async with TaskQueueManager() as manager:
for _ in range(batch):
task = await manager.reserve_task(job.pk, timeout=0.1)
if task:
tasks.append(task.dict())
manager = TaskQueueManager()
for _ in range(batch):
task = await manager.reserve_task(job.pk, timeout=0.1)
if task:
tasks.append(task.dict())
return tasks

# Use async_to_sync to properly handle the async call
Expand Down
36 changes: 18 additions & 18 deletions ami/ml/orchestration/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def cleanup_async_job_resources(job: "Job") -> bool:

# Cleanup NATS resources
async def cleanup():
async with TaskQueueManager() as manager:
return await manager.cleanup_job_resources(job.pk)
manager = TaskQueueManager()
return await manager.cleanup_job_resources(job.pk)

try:
nats_success = async_to_sync(cleanup)()
Expand Down Expand Up @@ -96,22 +96,22 @@ async def queue_all_images():
successful_queues = 0
failed_queues = 0

async with TaskQueueManager() as manager:
for image_pk, task in tasks:
try:
logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
success = await manager.publish_task(
job_id=job.pk,
data=task,
)
except Exception as e:
logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
success = False

if success:
successful_queues += 1
else:
failed_queues += 1
manager = TaskQueueManager()
for image_pk, task in tasks:
try:
logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
success = await manager.publish_task(
job_id=job.pk,
data=task,
)
except Exception as e:
logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
success = False

if success:
successful_queues += 1
else:
failed_queues += 1

return successful_queues, failed_queues

Expand Down
112 changes: 112 additions & 0 deletions ami/ml/orchestration/nats_connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
NATS connection pool for both Celery workers and Django processes.

Maintains a persistent NATS connection per process to avoid
the overhead of creating/closing connections for every operation.

The connection pool is lazily initialized on first use and shared
across all operations in the same process.
"""

import asyncio
import logging
from typing import TYPE_CHECKING

import nats
from django.conf import settings
from nats.js import JetStreamContext

if TYPE_CHECKING:
from nats.aio.client import Client as NATSClient

logger = logging.getLogger(__name__)


class ConnectionPool:
"""
Manages a single NATS connection per process (Celery worker or Django web worker).

This is safe because:
- Each process gets its own isolated connection
- NATS connections are async-safe (can be used by multiple coroutines)
- Works in both Celery prefork and Django WSGI/ASGI contexts
"""

def __init__(self):
self._nc: "NATSClient | None" = None
self._js: JetStreamContext | None = None
self._nats_url: str | None = None
self._lock = asyncio.Lock()

Comment thread
carlosgjs marked this conversation as resolved.
Outdated
async def get_connection(self) -> tuple["NATSClient", JetStreamContext]:
"""
Get or create the worker's NATS connection. Checks connection health and recreates if stale.

Returns:
Tuple of (NATS connection, JetStream context)
Raises:
RuntimeError: If connection cannot be established
"""
# Fast path: connection exists, is open, and is connected
if self._nc is not None and not self._nc.is_closed and self._nc.is_connected:
return self._nc, self._js # type: ignore

# Connection is stale or doesn't exist
if self._nc is not None:
logger.warning("NATS connection is closed or disconnected, will reconnect")
self._nc = None
self._js = None

# Slow path: need to create/recreate connection
async with self._lock:
# Double-check after acquiring lock
if self._nc is not None and not self._nc.is_closed and self._nc.is_connected:
return self._nc, self._js # type: ignore

# Get NATS URL from settings
if self._nats_url is None:
self._nats_url = getattr(settings, "NATS_URL", "nats://nats:4222")

try:
logger.info(f"Creating NATS connection to {self._nats_url}")
self._nc = await nats.connect(self._nats_url)
self._js = self._nc.jetstream()
logger.info(f"Successfully connected to NATS at {self._nats_url}")
return self._nc, self._js
except Exception as e:
logger.error(f"Failed to connect to NATS: {e}")
raise RuntimeError(f"Could not establish NATS connection: {e}") from e

async def close(self):
"""Close the NATS connection if it exists."""
if self._nc is not None and not self._nc.is_closed:
logger.info("Closing NATS connection")
await self._nc.close()
self._nc = None
self._js = None

def reset(self):
"""
Reset the connection pool (mark connection as stale).

This should be called when a connection error is detected.
The next call to get_connection() will create a fresh connection.
"""
logger.warning("Resetting NATS connection pool due to connection error")
self._nc = None
self._js = None
Comment thread
carlosgjs marked this conversation as resolved.
Outdated
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated


# Global pool instance - one per process (Celery worker or Django process)
_connection_pool: ConnectionPool | None = None


def get_pool() -> ConnectionPool:
"""
Get the process-local connection pool.
"""
global _connection_pool
if _connection_pool is None:
_connection_pool = ConnectionPool()
logger.debug("Lazily initialized NATS connection pool")
return _connection_pool
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Loading
Loading