Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ metadata:

This instructs Reloader to skip all reload logic for that resource across all workloads.

### 4. ⚙️ Workload-Specific Rollout Strategy (Argo Rollouts Only)
### 4. ⚙️ Workload-Specific Reload Strategies

#### Argo Rollouts

Note: This is only applicable when using [Argo Rollouts](https://argoproj.github.io/argo-rollouts/). It is ignored for standard Kubernetes `Deployments`, `StatefulSets`, or `DaemonSets`. To use this feature, Argo Rollouts support must be enabled in Reloader (for example via --is-argo-rollouts=true).

Expand All @@ -196,6 +198,43 @@ metadata:

This setting affects Argo Rollouts behavior, not Argo CD sync settings.

#### CronJobs

By default, when a ConfigMap or Secret referenced by a CronJob is updated, Reloader creates an **immediate Job** from the CronJob template. This ensures the new configuration is applied right away.

However, this default behavior has some drawbacks:
- Creates out-of-schedule job executions
- Produces "orphan" jobs not tracked by CronJob controller's history limits
- Bypasses CronJob's `concurrencyPolicy`

For use cases where you prefer to only update the CronJob template (so future scheduled runs get the new config) without creating immediate jobs, you can use the **patch** strategy:

```yaml
metadata:
annotations:
reloader.stakater.com/auto: "true"
reloader.stakater.com/cronjob-reload-strategy: "patch"
```

| Value | Behavior |
|------------------------|----------------------------------------------------------------------------------------------|
| (default/not set) | Creates an immediate Job from the CronJob template |
| `patch` | Patches the CronJob template and recreates any currently running Jobs owned by the CronJob |

✅ Use `patch` if:

1. You want to respect the CronJob's schedule and `concurrencyPolicy`
1. You want to avoid orphan jobs that bypass history limits
1. You only need future scheduled runs to pick up the new configuration
1. You have running Jobs that should be recreated with the new config

| Scenario | Default (immediate-job) | Patch mode |
|---------------------------------------|-------------------------------|-----------------------------------------|
| ConfigMap updated, no running Job | Creates new Job immediately | Patches CronJob template only |
| ConfigMap updated, Job running | Creates new Job (parallel) | Recreates running Job, patches template |
| Respects `concurrencyPolicy` | No | Yes (for scheduled runs) |
| Orphan jobs | Yes | No |

### 5. ❗ Annotation Behavior Rules & Compatibility

- `reloader.stakater.com/auto` and `reloader.stakater.com/search` **cannot be used together** — the `auto` annotation takes precedence.
Expand Down
96 changes: 95 additions & 1 deletion internal/pkg/callbacks/rolling_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,15 @@ func GetPatchTemplates() PatchTemplates {
}
}

// GetCronJobPatchTemplates returns patch templates for CronJob resources
func GetCronJobPatchTemplates() PatchTemplates {
return PatchTemplates{
AnnotationTemplate: `{"spec":{"jobTemplate":{"spec":{"template":{"metadata":{"annotations":{"%s":"%s"}}}}}}}`, // strategic merge patch
EnvVarTemplate: `{"spec":{"jobTemplate":{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"%s","value":"%s"}]}]}}}}}}`, // strategic merge patch
DeleteEnvVarTemplate: `[{"op":"remove","path":"/spec/jobTemplate/spec/template/spec/containers/%d/env/%d"}]`, // JSON patch
}
}

// UpdateDeployment performs rolling upgrade on deployment
func UpdateDeployment(clients kube.Clients, namespace string, resource runtime.Object) error {
deployment := resource.(*appsv1.Deployment)
Expand Down Expand Up @@ -464,8 +473,16 @@ func CreateJobFromCronjob(clients kube.Clients, namespace string, resource runti
return err
}

// PatchCronJob patches a CronJob's jobTemplate to update pod annotations/env vars
func PatchCronJob(clients kube.Clients, namespace string, resource runtime.Object, patchType patchtypes.PatchType, bytes []byte) error {
return errors.New("not supported patching: CronJob")
cronJob := resource.(*batchv1.CronJob)
_, err := clients.KubernetesClient.BatchV1().CronJobs(namespace).Patch(
context.TODO(),
cronJob.Name,
patchType,
bytes,
meta_v1.PatchOptions{FieldManager: "Reloader"})
return err
}

// ReCreateJobFromjob performs rolling upgrade on job
Expand Down Expand Up @@ -500,6 +517,83 @@ func ReCreateJobFromjob(clients kube.Clients, namespace string, resource runtime
return err
}

// RecreateRunningJobsForCronJob finds and recreates any running Jobs owned by the CronJob
func RecreateRunningJobsForCronJob(clients kube.Clients, namespace string, cronJob *batchv1.CronJob) error {
// List Jobs in the namespace
jobs, err := clients.KubernetesClient.BatchV1().Jobs(namespace).List(context.TODO(), meta_v1.ListOptions{})
if err != nil {
return err
}

for _, job := range jobs.Items {
// Check if Job is owned by this CronJob and is active
if isOwnedByCronJob(&job, cronJob) && job.Status.Active > 0 {
// Recreate the job
if err := recreateJob(clients, namespace, &job); err != nil {
logrus.Errorf("Failed to recreate Job %s owned by CronJob %s: %v", job.Name, cronJob.Name, err)
return err
}
logrus.Infof("Recreated running Job %s owned by CronJob %s", job.Name, cronJob.Name)
}
}
return nil
}

// isOwnedByCronJob checks if a Job is owned by a specific CronJob
func isOwnedByCronJob(job *batchv1.Job, cronJob *batchv1.CronJob) bool {
for _, ownerRef := range job.OwnerReferences {
if ownerRef.Kind == "CronJob" && ownerRef.Name == cronJob.Name && ownerRef.UID == cronJob.UID {
return true
}
}
return false
}

// recreateJob deletes and recreates a Job with the same spec
func recreateJob(clients kube.Clients, namespace string, oldJob *batchv1.Job) error {
job := oldJob.DeepCopy()

// Delete the old job
policy := meta_v1.DeletePropagationBackground
err := clients.KubernetesClient.BatchV1().Jobs(namespace).Delete(context.TODO(), job.Name, meta_v1.DeleteOptions{PropagationPolicy: &policy})
if err != nil {
return err
}

// Remove fields that should not be specified when creating a new Job
job.ResourceVersion = ""
job.UID = ""
job.CreationTimestamp = meta_v1.Time{}
job.Status = batchv1.JobStatus{}

// Remove problematic labels
delete(job.Spec.Template.Labels, "controller-uid")
delete(job.Spec.Template.Labels, batchv1.ControllerUidLabel)
delete(job.Spec.Template.Labels, batchv1.JobNameLabel)
delete(job.Spec.Template.Labels, "job-name")

// Remove the selector to allow it to be auto-generated
job.Spec.Selector = nil

// Create the new job with same spec
_, err = clients.KubernetesClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, meta_v1.CreateOptions{FieldManager: "Reloader"})
return err
}

// UpdateCronJobWithRunningJobs patches the CronJob template and recreates any running Jobs
func UpdateCronJobWithRunningJobs(clients kube.Clients, namespace string, resource runtime.Object) error {
cronJob := resource.(*batchv1.CronJob)

// Recreate any running Jobs so they pick up the new config
if err := RecreateRunningJobsForCronJob(clients, namespace, cronJob); err != nil {
return err
}

// The CronJob template will be patched by the caller using PatchFunc
// This UpdateFunc is a no-op for the CronJob itself since we use patching
return nil
}

func PatchJob(clients kube.Clients, namespace string, resource runtime.Object, patchType patchtypes.PatchType, bytes []byte) error {
return errors.New("not supported patching: Job")
}
Expand Down
111 changes: 110 additions & 1 deletion internal/pkg/callbacks/rolling_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ func TestPatchResources(t *testing.T) {
assert.Equal(t, "test", patchedResource.(*appsv1.StatefulSet).Annotations["test"])
}},
{"CronJob", createTestCronJobWithAnnotations, callbacks.PatchCronJob, deleteTestCronJob, func(err error) {
assert.EqualError(t, err, "not supported patching: CronJob")
assert.NoError(t, err)
patchedResource, err := callbacks.GetCronJobItem(clients, "test-cronjob", fixtures.namespace)
assert.NoError(t, err)
assert.Equal(t, "test", patchedResource.(*batchv1.CronJob).Annotations["test"])
}},
{"Job", createTestJobWithAnnotations, callbacks.PatchJob, deleteTestJob, func(err error) {
assert.EqualError(t, err, "not supported patching: Job")
Expand Down Expand Up @@ -491,6 +494,112 @@ func TestGetPatchDeleteTemplateEnvVar(t *testing.T) {
assert.Equal(t, 2, strings.Count(templates.DeleteEnvVarTemplate, "%d"))
}

func TestGetCronJobPatchTemplateAnnotation(t *testing.T) {
templates := callbacks.GetCronJobPatchTemplates()
assert.NotEmpty(t, templates.AnnotationTemplate)
assert.Equal(t, 2, strings.Count(templates.AnnotationTemplate, "%s"))
// Verify the path is correct for CronJob (jobTemplate nested structure)
assert.Contains(t, templates.AnnotationTemplate, "jobTemplate")
}

func TestGetCronJobPatchTemplateEnvVar(t *testing.T) {
templates := callbacks.GetCronJobPatchTemplates()
assert.NotEmpty(t, templates.EnvVarTemplate)
assert.Equal(t, 3, strings.Count(templates.EnvVarTemplate, "%s"))
// Verify the path is correct for CronJob (jobTemplate nested structure)
assert.Contains(t, templates.EnvVarTemplate, "jobTemplate")
}

func TestGetCronJobPatchDeleteTemplateEnvVar(t *testing.T) {
templates := callbacks.GetCronJobPatchTemplates()
assert.NotEmpty(t, templates.DeleteEnvVarTemplate)
assert.Equal(t, 2, strings.Count(templates.DeleteEnvVarTemplate, "%d"))
// Verify the path is correct for CronJob (jobTemplate nested structure)
assert.Contains(t, templates.DeleteEnvVarTemplate, "jobTemplate")
}

func TestPatchCronJob(t *testing.T) {
fixtures := newTestFixtures()

// Create a CronJob
cronJob, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1")
assert.NoError(t, err)

// Patch the CronJob
patchBytes := []byte(`{"metadata":{"annotations":{"test":"test"}}}`)
err = callbacks.PatchCronJob(clients, fixtures.namespace, cronJob, patchtypes.StrategicMergePatchType, patchBytes)
assert.NoError(t, err)

// Verify the patch was applied
patchedCronJob, err := callbacks.GetCronJobItem(clients, "test-cronjob", fixtures.namespace)
assert.NoError(t, err)
assert.Equal(t, "test", patchedCronJob.(*batchv1.CronJob).Annotations["test"])

// Cleanup
err = deleteTestCronJob(clients, fixtures.namespace, "test-cronjob")
assert.NoError(t, err)
}

func TestRecreateRunningJobsForCronJob(t *testing.T) {
fixtures := newTestFixtures()

// Create a CronJob
cronJobObj, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1")
assert.NoError(t, err)
cronJob := cronJobObj.(*batchv1.CronJob)

// Create a Job owned by the CronJob with active status
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-from-cronjob",
Namespace: fixtures.namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Kind: "CronJob",
Name: cronJob.Name,
UID: cronJob.UID,
},
},
},
Status: batchv1.JobStatus{
Active: 1, // Job is running
},
}
_, err = clients.KubernetesClient.BatchV1().Jobs(fixtures.namespace).Create(context.TODO(), job, metav1.CreateOptions{})
assert.NoError(t, err)

// Call RecreateRunningJobsForCronJob
err = callbacks.RecreateRunningJobsForCronJob(clients, fixtures.namespace, cronJob)
assert.NoError(t, err)

// Verify the job was recreated (old one deleted, new one created with same name)
// Note: In the fake client, the job recreation might behave differently than in a real cluster
// The important thing is that no error occurred

// Cleanup
_ = clients.KubernetesClient.BatchV1().Jobs(fixtures.namespace).Delete(context.TODO(), "test-job-from-cronjob", metav1.DeleteOptions{})
err = deleteTestCronJob(clients, fixtures.namespace, cronJob.Name)
assert.NoError(t, err)
}

func TestUpdateCronJobWithRunningJobs(t *testing.T) {
fixtures := newTestFixtures()

// Create a CronJob
cronJobObj, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1")
assert.NoError(t, err)
cronJob := cronJobObj.(*batchv1.CronJob)

// Call UpdateCronJobWithRunningJobs (should not error even with no running jobs)
err = callbacks.UpdateCronJobWithRunningJobs(clients, fixtures.namespace, cronJob)
assert.NoError(t, err)

// Cleanup
err = deleteTestCronJob(clients, fixtures.namespace, cronJob.Name)
assert.NoError(t, err)
}

// Helper functions

func isRestartStrategy(rollout *argorolloutv1alpha1.Rollout) bool {
Expand Down
65 changes: 63 additions & 2 deletions internal/pkg/handler/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GetDeploymentRollingUpgradeFuncs() callbacks.RollingUpgradeFuncs {
}
}

// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob
// GetCronJobCreateJobFuncs returns all callback funcs for a cronjob (default behavior - creates immediate job)
func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
return callbacks.RollingUpgradeFuncs{
ItemFunc: callbacks.GetCronJobItem,
Expand All @@ -70,6 +70,25 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
}
}

// GetCronJobPatchFuncs returns all callback funcs for a cronjob with patch strategy
// This patches the CronJob template and recreates running Jobs instead of creating an immediate new job
func GetCronJobPatchFuncs() callbacks.RollingUpgradeFuncs {
return callbacks.RollingUpgradeFuncs{
ItemFunc: callbacks.GetCronJobItem,
ItemsFunc: callbacks.GetCronJobItems,
AnnotationsFunc: callbacks.GetCronJobAnnotations,
PodAnnotationsFunc: callbacks.GetCronJobPodAnnotations,
ContainersFunc: callbacks.GetCronJobContainers,
InitContainersFunc: callbacks.GetCronJobInitContainers,
UpdateFunc: callbacks.UpdateCronJobWithRunningJobs,
PatchFunc: callbacks.PatchCronJob,
PatchTemplatesFunc: callbacks.GetCronJobPatchTemplates,
VolumesFunc: callbacks.GetCronJobVolumes,
ResourceType: "CronJob",
SupportsPatch: true,
}
}

// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob
func GetJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
return callbacks.RollingUpgradeFuncs{
Expand Down Expand Up @@ -195,7 +214,7 @@ func doRollingUpgrade(config common.Config, collectors metrics.Collectors, recor

// Only process CronJobs if they are not ignored
if !ignoredWorkloadTypes.Contains("cronjobs") {
err = rollingUpgrade(clients, config, GetCronJobCreateJobFuncs(), collectors, recorder, invoke)
err = rollingUpgradeCronJobs(clients, config, collectors, recorder, invoke)
if err != nil {
return err
}
Expand Down Expand Up @@ -236,6 +255,48 @@ func rollingUpgrade(clients kube.Clients, config common.Config, upgradeFuncs cal
return err
}

// rollingUpgradeCronJobs handles CronJob upgrades with per-CronJob strategy selection
// based on the reloader.stakater.com/cronjob-reload-strategy annotation
func rollingUpgradeCronJobs(clients kube.Clients, config common.Config, collectors metrics.Collectors, recorder record.EventRecorder, strategy invokeStrategy) error {
// Get all CronJobs using the default funcs (just for listing)
defaultFuncs := GetCronJobCreateJobFuncs()
items := defaultFuncs.ItemsFunc(clients, config.Namespace)

// Record workloads scanned
collectors.RecordWorkloadsScanned("CronJob", len(items))

matchedCount := 0
for _, item := range items {
// Check the CronJob's annotation to determine which strategy to use
annotations := callbacks.GetCronJobAnnotations(item)
var upgradeFuncs callbacks.RollingUpgradeFuncs

if annotations[options.CronJobReloadStrategyAnnotation] == "patch" {
// Use patch mode: patches CronJob template and recreates running Jobs
upgradeFuncs = GetCronJobPatchFuncs()
} else {
// Default mode: creates an immediate Job from the CronJob
upgradeFuncs = GetCronJobCreateJobFuncs()
}

matched, err := retryOnConflict(retry.DefaultRetry, func(fetchResource bool) (bool, error) {
return upgradeResource(clients, config, upgradeFuncs, collectors, recorder, strategy, item, fetchResource)
})
if err != nil {
logrus.Errorf("Rolling upgrade for CronJob failed with error = %v", err)
return err
}
if matched {
matchedCount++
}
}

// Record workloads matched
collectors.RecordWorkloadsMatched("CronJob", matchedCount)

return nil
}

// PerformAction invokes the deployment if there is any change in configmap or secret data
func PerformAction(clients kube.Clients, config common.Config, upgradeFuncs callbacks.RollingUpgradeFuncs, collectors metrics.Collectors, recorder record.EventRecorder, strategy invokeStrategy) error {
items := upgradeFuncs.ItemsFunc(clients, config.Namespace)
Expand Down
Loading