Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions services/runners/job_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,39 +323,23 @@ func (p *JobPool) Run() {
err := running.job.Run(t.username, t.incomingVersion, t.alias)

if err != nil {

log.WithFields(log.Fields{
"context": "job_running",
"task_id": t.taskID,
"task_status": t.status,
}).WithError(err).Error("launch job failed")

running.Log("Unable to launch the application. Please contact your system administrator for assistance.")

if running.getStatus() == task_logger.TaskStoppingStatus {
running.SetStatus(task_logger.TaskStoppedStatus)
} else {
running.SetStatus(task_logger.TaskFailStatus)
}
} else {

log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"status": string(running.getStatus()),
}).Debug("Job run returned")

if running.getStatus().IsFinished() {
return
}

if running.getStatus() == task_logger.TaskStoppingStatus {
running.SetStatus(task_logger.TaskStoppedStatus)
} else {
running.SetStatus(task_logger.TaskSuccessStatus)
}
}

running.finalizeAfterRun(err)

log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
Expand Down
24 changes: 24 additions & 0 deletions services/runners/running_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,30 @@ func (p *runningJob) getStatus() task_logger.TaskStatus {
return p.status
}

// finalizeAfterRun applies the terminal status after job.Run returns. If the job
// was already brought to a terminal state (e.g. emergency-stopped via
// terminated_jobs while Run was still unwinding), the status is left unchanged.
func (p *runningJob) finalizeAfterRun(err error) {
if p.getStatus().IsFinished() {
return
}

if err != nil {
if p.getStatus() == task_logger.TaskStoppingStatus {
p.SetStatus(task_logger.TaskStoppedStatus)
} else {
p.SetStatus(task_logger.TaskFailStatus)
}
return
}

if p.getStatus() == task_logger.TaskStoppingStatus {
p.SetStatus(task_logger.TaskStoppedStatus)
} else {
p.SetStatus(task_logger.TaskSuccessStatus)
}
}

// getProgress atomically snapshots the data needed to report progress to the
// server. The returned slice is a copy, so the caller can read it freely while
// the job keeps appending records.
Expand Down
37 changes: 37 additions & 0 deletions services/runners/running_job_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runners

import (
"errors"
"fmt"
"sync"
"testing"
Expand Down Expand Up @@ -152,3 +153,39 @@ func TestRunningJob_ConcurrentAccess(t *testing.T) {
close(start)
wg.Wait()
}

func TestRunningJob_finalizeAfterRun_KeepsFinishedStatusOnRunError(t *testing.T) {
rj := newTestRunningJob(1)
rj.SetStatus(task_logger.TaskStoppedStatus)

rj.finalizeAfterRun(errors.New("process killed"))

assert.Equal(t, task_logger.TaskStoppedStatus, rj.getStatus())
}

func TestRunningJob_finalizeAfterRun_SetsFailOnRunError(t *testing.T) {
rj := newTestRunningJob(2)
rj.SetStatus(task_logger.TaskRunningStatus)

rj.finalizeAfterRun(errors.New("ansible failed"))

assert.Equal(t, task_logger.TaskFailStatus, rj.getStatus())
}

func TestRunningJob_finalizeAfterRun_SetsSuccessOnCleanReturn(t *testing.T) {
rj := newTestRunningJob(3)
rj.SetStatus(task_logger.TaskRunningStatus)

rj.finalizeAfterRun(nil)

assert.Equal(t, task_logger.TaskSuccessStatus, rj.getStatus())
}

func TestRunningJob_finalizeAfterRun_StoppingBecomesStopped(t *testing.T) {
rj := newTestRunningJob(4)
rj.SetStatus(task_logger.TaskStoppingStatus)

rj.finalizeAfterRun(nil)

assert.Equal(t, task_logger.TaskStoppedStatus, rj.getStatus())
}
Loading