Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions ami/jobs/migrations/0021_joblog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
dependencies = [
("jobs", "0020_schedule_job_monitoring_beat_tasks"),
]

operations = [
migrations.CreateModel(
name="JobLog",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("level", models.CharField(max_length=20)),
("message", models.TextField()),
("context", models.JSONField(blank=True, default=dict)),
(
"job",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, related_name="log_entries", to="jobs.job"
),
),
],
options={
"ordering": ["-created_at", "-pk"],
"indexes": [models.Index(fields=["job", "-created_at"], name="jobs_joblog_job_id_e4aa59_idx")],
},
),
]
22 changes: 22 additions & 0 deletions ami/jobs/migrations/0022_alter_job_logs_help_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import ami.jobs.models
import django_pydantic_field.fields
from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("jobs", "0021_joblog"),
]

operations = [
migrations.AlterField(
model_name="job",
name="logs",
field=django_pydantic_field.fields.PydanticSchemaField(
config=None,
default=ami.jobs.models.JobLogs,
help_text="DEPRECATED: read-only fallback for pre-#1259 jobs. Use the JobLog table for new writes.",
schema=ami.jobs.models.JobLogs,
),
),
]
124 changes: 91 additions & 33 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,80 @@ class JobLogs(pydantic.BaseModel):
stderr: list[str] = pydantic.Field(default_factory=list, alias="stderr", title="Error messages")


class JobLog(BaseModel):
"""Append-only per-job log row.

Replaces the ``jobs_job.logs`` JSON-field UPDATE path that caused row-lock
contention under concurrent async_api load (issue #1256). Each log emit
becomes a cheap INSERT on this child table instead of a refresh+UPDATE of
the shared parent row. Legacy JSON-field logs are still served by the
serializer for jobs created before this table existed.
"""

project_accessor = "job__project"

job = models.ForeignKey("Job", on_delete=models.CASCADE, related_name="log_entries")
level = models.CharField(max_length=20)
message = models.TextField()
# Freeform bag for future per-line metadata (stage, worker id, counters, ...)
# without requiring a schema migration. Kept nullable/empty-default so it
# costs nothing on existing rows.
context = models.JSONField(blank=True, default=dict)

class Meta:
ordering = ["-created_at", "-pk"]
indexes = [models.Index(fields=["job", "-created_at"])]


JOB_LOG_LEVELS_STDERR = {"ERROR", "CRITICAL"}
JOB_LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
JOB_LOGS_DEFAULT_LIMIT = 1000
# Hard ceiling on a single read response. Keeps payload size bounded even when
# a caller passes ``?logs_limit=...``. Real pagination ships separately with a
# dedicated ``/jobs/logs/`` endpoint.
JOB_LOGS_MAX_LIMIT = 5000


def _legacy_logs_shape(job: "Job") -> dict[str, list[str]]:
legacy = getattr(job, "logs", None)
return {
"stdout": list(getattr(legacy, "stdout", []) or []),
"stderr": list(getattr(legacy, "stderr", []) or []),
}


def serialize_job_logs(job: "Job", *, limit: int = JOB_LOGS_DEFAULT_LIMIT) -> dict[str, list[str]]:
"""Return ``{stdout, stderr}`` in the shape the UI already parses.

Reads joined ``JobLog`` rows first (newest-first, capped at ``limit`` per
request — there is no per-job storage cap; the data integrity check
framework handles retention). Jobs created before the table existed and
jobs written while ``JOB_LOG_PERSIST_ENABLED=False`` have no rows and fall
back to the legacy ``jobs_job.logs`` JSON column so their UI log panel
stays populated.
"""
entries = list(
JobLog.objects.filter(job_id=job.pk)
.only("created_at", "level", "message")
.order_by("-created_at", "-pk")[:limit]
)
if entries:
return {
"stdout": [
f"[{entry.created_at.strftime(JOB_LOG_TIMESTAMP_FORMAT)}] {entry.level} {entry.message}"
for entry in entries
],
"stderr": [entry.message for entry in entries if entry.level in JOB_LOG_LEVELS_STDERR],
}

return _legacy_logs_shape(job)


class JobLogHandler(logging.Handler):
"""
Class for handling logs from a job and writing them to the job instance.
"""

max_log_length = 1000

def __init__(self, job: "Job", *args, **kwargs):
self.job = job
super().__init__(*args, **kwargs)
Expand All @@ -337,41 +404,24 @@ def emit(self, record: logging.LogRecord):
# Log to the current app logger (container stdout).
logger.log(record.levelno, self.format(record))

# Gated by ``JOB_LOG_PERSIST_ENABLED`` (default True). Persisting every
# log line to ``jobs_job.logs`` becomes a row-lock contention point
# under concurrent async_api load — each call triggers
# ``UPDATE jobs_job SET logs = ...`` on the shared job row, and inside
# ``ATOMIC_REQUESTS`` a single batched ``/result`` POST stacks N such
# UPDATEs in one tx, blocking every ML worker on the same row for the
# duration of the request. Deployments hitting that pattern can set the
# flag to False to short-circuit here until PR #1259 lands an
# append-only ``JobLog`` child table. See issue #1256.
# Escape hatch: when False, skip the per-job DB write entirely. Container
# stdout still captures every line above, so ops observability is
# unchanged; only the per-job UI log view loses new entries for the
# duration the flag is off. Default is True. See issue #1256.
if not getattr(settings, "JOB_LOG_PERSIST_ENABLED", True):
return

# Write to the logs field on the job instance.
# Refresh from DB first to reduce the window for concurrent overwrites — each
# worker holds its own stale in-memory copy of `logs`, so without a refresh the
# last writer always wins and earlier entries are silently dropped.
# @TODO consider saving logs to the database periodically rather than on every log
# Append-only insert on the JobLog child table. Unlike the legacy
# jobs_job.logs JSONB update path, this does not contend with
# _update_job_progress on the parent row.
try:
self.job.refresh_from_db(fields=["logs"])
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
if msg not in self.job.logs.stdout:
self.job.logs.stdout.insert(0, msg)

# Write a simpler copy of any errors to the errors field
if record.levelno >= logging.ERROR:
if record.message not in self.job.logs.stderr:
self.job.logs.stderr.insert(0, record.message)

if len(self.job.logs.stdout) > self.max_log_length:
self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]

self.job.save(update_fields=["logs"], update_progress=False)
JobLog.objects.create(
job_id=self.job.pk,
level=record.levelname,
message=self.format(record),
)
except Exception as e:
logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")
logger.error(f"Failed to save log for job #{self.job.pk}: {e}")


@dataclass
Expand Down Expand Up @@ -853,7 +903,15 @@ class Job(BaseModel):
# @TODO can we use an Enum or Pydantic model for status?
status = models.CharField(max_length=255, default=JobState.CREATED.name, choices=JobState.choices())
progress: JobProgress = SchemaField(JobProgress, default=default_job_progress)
logs: JobLogs = SchemaField(JobLogs, default=JobLogs)
# DEPRECATED: per-line writes moved to the JobLog child table (issue #1256, PR #1259).
# Retained as a read-only fallback so jobs created before the migration still
# surface their stored logs in the UI. Will be dropped in a follow-up after
# the legacy rows are backfilled into JobLog. Do not write to this field.
logs: JobLogs = SchemaField(
JobLogs,
default=JobLogs,
help_text="DEPRECATED: read-only fallback for pre-#1259 jobs. Use the JobLog table for new writes.",
)
params = models.JSONField(null=True, blank=True)
result = models.JSONField(null=True, blank=True)
task_id = models.CharField(max_length=255, null=True, blank=True)
Expand Down
11 changes: 11 additions & 0 deletions ami/jobs/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ class QueuedTaskAcknowledgment(pydantic.BaseModel):
required=False,
type=bool,
)

logs_limit_param = OpenApiParameter(
name="logs_limit",
description=(
"Max number of JobLog rows to include in the ``logs`` field on the detail response. "
"Newest-first. Defaults to 1000, capped at 5000. Pagination over older entries will "
"ship with a dedicated ``/jobs/logs/`` endpoint."
),
required=False,
type=int,
)
29 changes: 27 additions & 2 deletions ami/jobs/serializers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django_pydantic_field.rest_framework import SchemaField
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers

from ami.exports.models import DataExport
Expand All @@ -13,7 +14,7 @@
from ami.ml.schemas import PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo
from ami.ml.serializers import PipelineNestedSerializer

from .models import Job, JobLogs, JobProgress, MLJob
from .models import JOB_LOGS_DEFAULT_LIMIT, Job, JobProgress, MLJob, _legacy_logs_shape, serialize_job_logs
from .schemas import QueuedTaskAcknowledgment


Expand Down Expand Up @@ -49,7 +50,7 @@ class JobListSerializer(DefaultSerializer):
source_image_single = SourceImageNestedSerializer(read_only=True)
data_export = DataExportNestedSerializer(read_only=True)
progress = SchemaField(schema=JobProgress, read_only=True)
logs = SchemaField(schema=JobLogs, read_only=True)
logs = serializers.SerializerMethodField()
job_type = JobTypeSerializer(read_only=True)
# All jobs created from the Jobs UI are ML jobs (datasync, etc. are created for the user)
# @TODO Remove this when the UI is updated pass a job type. This should be a required field.
Expand Down Expand Up @@ -147,6 +148,30 @@ class Meta:
"dispatch_mode",
]

@extend_schema_field(
{
"type": "object",
"properties": {
"stdout": {"type": "array", "items": {"type": "string"}, "title": "All messages"},
"stderr": {"type": "array", "items": {"type": "string"}, "title": "Error messages"},
},
"required": ["stdout", "stderr"],
}
)
def get_logs(self, obj: Job) -> dict[str, list[str]]:
# List responses skip the JobLog query to avoid N+1 — the UI only renders
# logs on the detail page, so returning the (typically empty for new jobs)
# legacy JSON shape is acceptable. Detail responses go to the joined table
# and fall back to the legacy shape for pre-migration jobs.
view = self.context.get("view")
if getattr(view, "action", None) == "list":
return _legacy_logs_shape(obj)
# ``JobViewSet.get_serializer_context`` validates ``?logs_limit=`` and
# puts the cleaned int (or ``None`` when unset) on context, so a bad
# value already 400'd before we got here.
limit = self.context.get("logs_limit") or JOB_LOGS_DEFAULT_LIMIT
return serialize_job_logs(obj, limit=limit)


class JobSerializer(JobListSerializer):
# progress = serializers.JSONField(initial=Job.default_progress(), allow_null=False, required=False)
Expand Down
Loading
Loading