From cc1b512c332f1b201d1c34ebb7103be14edfd865 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 17 Jun 2026 11:04:34 +0000 Subject: [PATCH] fix(runners): preserve terminal status when Run returns after emergency stop Commit 2a156b30 moved the IsFinished() early return into the success-only branch of the post-Run handler. When applyTerminatedJobs kills a job and sets stopped while Run() is still unwinding, the error path overwrote the correct terminal status with failed. Restore the finished-status guard for both branches via finalizeAfterRun. Co-authored-by: Denis Gukov --- services/runners/job_pool.go | 20 ++------------- services/runners/running_job.go | 24 ++++++++++++++++++ services/runners/running_job_test.go | 37 ++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/services/runners/job_pool.go b/services/runners/job_pool.go index d1d7bbea7..940297117 100644 --- a/services/runners/job_pool.go +++ b/services/runners/job_pool.go @@ -323,7 +323,6 @@ func (p *JobPool) Run() { err := running.job.Run(t.username, t.incomingVersion, t.alias) if err != nil { - log.WithFields(log.Fields{ "context": "job_running", "task_id": t.taskID, @@ -331,31 +330,16 @@ func (p *JobPool) Run() { }).WithError(err).Error("launch job failed") running.Log("Unable to launch the application. Please contact your system administrator for assistance.") - - if running.getStatus() == task_logger.TaskStoppingStatus { - running.SetStatus(task_logger.TaskStoppedStatus) - } else { - running.SetStatus(task_logger.TaskFailStatus) - } } else { - log.WithFields(log.Fields{ "context": "job_running", "task_id": running.taskID, "status": string(running.getStatus()), }).Debug("Job run returned") - - if running.getStatus().IsFinished() { - return - } - - if running.getStatus() == task_logger.TaskStoppingStatus { - running.SetStatus(task_logger.TaskStoppedStatus) - } else { - running.SetStatus(task_logger.TaskSuccessStatus) - } } + running.finalizeAfterRun(err) + log.WithFields(log.Fields{ "context": "job_running", "task_id": running.taskID, diff --git a/services/runners/running_job.go b/services/runners/running_job.go index cf8edc5cb..e2112f6e3 100644 --- a/services/runners/running_job.go +++ b/services/runners/running_job.go @@ -129,6 +129,30 @@ func (p *runningJob) getStatus() task_logger.TaskStatus { return p.status } +// finalizeAfterRun applies the terminal status after job.Run returns. If the job +// was already brought to a terminal state (e.g. emergency-stopped via +// terminated_jobs while Run was still unwinding), the status is left unchanged. +func (p *runningJob) finalizeAfterRun(err error) { + if p.getStatus().IsFinished() { + return + } + + if err != nil { + if p.getStatus() == task_logger.TaskStoppingStatus { + p.SetStatus(task_logger.TaskStoppedStatus) + } else { + p.SetStatus(task_logger.TaskFailStatus) + } + return + } + + if p.getStatus() == task_logger.TaskStoppingStatus { + p.SetStatus(task_logger.TaskStoppedStatus) + } else { + p.SetStatus(task_logger.TaskSuccessStatus) + } +} + // getProgress atomically snapshots the data needed to report progress to the // server. The returned slice is a copy, so the caller can read it freely while // the job keeps appending records. diff --git a/services/runners/running_job_test.go b/services/runners/running_job_test.go index deed039b8..a331aedf0 100644 --- a/services/runners/running_job_test.go +++ b/services/runners/running_job_test.go @@ -1,6 +1,7 @@ package runners import ( + "errors" "fmt" "sync" "testing" @@ -152,3 +153,39 @@ func TestRunningJob_ConcurrentAccess(t *testing.T) { close(start) wg.Wait() } + +func TestRunningJob_finalizeAfterRun_KeepsFinishedStatusOnRunError(t *testing.T) { + rj := newTestRunningJob(1) + rj.SetStatus(task_logger.TaskStoppedStatus) + + rj.finalizeAfterRun(errors.New("process killed")) + + assert.Equal(t, task_logger.TaskStoppedStatus, rj.getStatus()) +} + +func TestRunningJob_finalizeAfterRun_SetsFailOnRunError(t *testing.T) { + rj := newTestRunningJob(2) + rj.SetStatus(task_logger.TaskRunningStatus) + + rj.finalizeAfterRun(errors.New("ansible failed")) + + assert.Equal(t, task_logger.TaskFailStatus, rj.getStatus()) +} + +func TestRunningJob_finalizeAfterRun_SetsSuccessOnCleanReturn(t *testing.T) { + rj := newTestRunningJob(3) + rj.SetStatus(task_logger.TaskRunningStatus) + + rj.finalizeAfterRun(nil) + + assert.Equal(t, task_logger.TaskSuccessStatus, rj.getStatus()) +} + +func TestRunningJob_finalizeAfterRun_StoppingBecomesStopped(t *testing.T) { + rj := newTestRunningJob(4) + rj.SetStatus(task_logger.TaskStoppingStatus) + + rj.finalizeAfterRun(nil) + + assert.Equal(t, task_logger.TaskStoppedStatus, rj.getStatus()) +}