Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/external/cinder/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

Expand All @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
4 changes: 4 additions & 0 deletions api/external/ironcore/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ import (
"log/slog"

ironcorev1alpha1 "github.com/cobaltcore-dev/cortex/api/external/ironcore/v1alpha1"
"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

type MachinePipelineRequest struct {
// The available machine pools.
Pools []ironcorev1alpha1.MachinePool `json:"pools"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r MachinePipelineRequest) GetOptions() scheduling.Options { return r.Options }
func (r MachinePipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Pools))
for i, host := range r.Pools {
Expand Down
4 changes: 4 additions & 0 deletions api/external/manila/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

Expand All @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
7 changes: 7 additions & 0 deletions api/external/nova/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"strings"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)
Expand Down Expand Up @@ -37,8 +38,14 @@ type ExternalSchedulerRequest struct {

// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`

// Options configure the pipeline behavior for this scheduling call.
// Set by the caller (CR controller, failover controller, Nova).
// Nova does not set these; Cortex fills in config-derived defaults server-side.
Options scheduling.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
4 changes: 4 additions & 0 deletions api/external/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pods
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
corev1 "k8s.io/api/core/v1"
)
Expand All @@ -15,8 +16,11 @@ type PodPipelineRequest struct {
Nodes []corev1.Node `json:"nodes"`
// The pod to be scheduled.
Pod corev1.Pod `json:"pod"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r PodPipelineRequest) GetOptions() scheduling.Options { return r.Options }
func (r PodPipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Nodes))
for i, host := range r.Nodes {
Expand Down
46 changes: 46 additions & 0 deletions api/scheduling/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package scheduling

import (
"errors"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
)

// Options configure the behavior of a single pipeline run at call time.
// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts),
// which are static and set when the pipeline is initialized.
type Options struct {
// ReadOnly means the pipeline run does not modify shared scheduling state (reservations,
// history, inflight records). Concurrent read-only runs are safe under a shared read lock.
// Note: the controller may still write the Decision status after Run() regardless of this flag.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inconsistent. Shouldn't we draw the line here? Read-only requests create or modify NO resources and are purely to calculate host candidates for constraints.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are use cases, we can discuss live

ReadOnly bool `json:"read_only,omitempty"`
// LockReservations prevents reservation unlocking, e.g. in the capacity filter.
// Set when finding hosts for new reservations (failover, CR) to see true available capacity.
LockReservations bool `json:"lock_reservations,omitempty"`
// AssumeEmptyHosts treats all hosts as having no running VMs.
AssumeEmptyHosts bool `json:"assume_empty_hosts,omitempty"`
// IgnoredReservationTypes lists reservation types the capacity filter skips entirely.
IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignored_reservation_types,omitempty"`
// MaxCandidates limits the number of hosts returned after weighing. 0 means no limit.
MaxCandidates int `json:"max_candidates,omitempty"`

// SkipHistory skips recording the placement decision in placement history.
// Defaults to false so Nova requests record history without needing to set this explicitly.
SkipHistory bool `json:"skip_history,omitempty"`
// CreateInflight creates pessimistic blocking reservations for all returned candidates.
CreateInflight bool `json:"create_inflight,omitempty"`
}

// Validate checks for mutually exclusive or inconsistent option combinations.
func (o Options) Validate() error {
if o.ReadOnly && !o.SkipHistory {
return errors.New("read-only runs must not write scheduling history: set SkipHistory=true")
}
if o.ReadOnly && o.CreateInflight {
return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
return nil
}
34 changes: 34 additions & 0 deletions api/scheduling/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package scheduling

import "testing"

func TestOptions_Validate(t *testing.T) {
tests := []struct {
name string
opts Options
wantErr bool
}{
{"zero value is valid", Options{}, false},
{"write run, history recorded by default", Options{}, false},
{"write run with inflight", Options{CreateInflight: true}, false},
{"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
{"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
{"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true},
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.opts.Validate()
if tt.wantErr && err == nil {
t.Error("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Errorf("expected no error, got %v", err)
}
})
}
}
5 changes: 0 additions & 5 deletions api/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ type PipelineSpec struct {
// +kubebuilder:validation:Optional
Description string `json:"description,omitempty"`

// If this pipeline should create history objects.
// When this is false, the pipeline will still process requests.
// +kubebuilder:default=false
CreateHistory bool `json:"createHistory,omitempty"`

// If this pipeline should ignore host preselection and gather all
// available placement candidates before applying filters, instead of
// relying on a pre-filtered set and weights.
Expand Down
6 changes: 0 additions & 6 deletions helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ spec:
spec:
description: spec defines the desired state of Pipeline
properties:
createHistory:
default: false
description: |-
If this pipeline should create history objects.
When this is false, the pipeline will still process requests.
type: boolean
description:
description: An optional description of the pipeline, helping understand
its purpose.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -74,10 +73,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.
c.processMu.Lock()
defer c.processMu.Unlock()

pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name]
if !ok {
return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name)
}
err := c.process(ctx, decision)
if err != nil {
meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{
Expand All @@ -94,11 +89,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.
Message: "pipeline run succeeded",
})
}
if pipelineConf.Spec.CreateHistory {
if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil {
ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history")
}
}
return err
}

Expand All @@ -122,6 +112,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision
}

result, err := pipeline.Run(request)
if !request.Options.SkipHistory {
if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil {
log.Error(upsertErr, "failed to create/update history")
}
}
if err != nil {
log.Error(err, "failed to run pipeline")
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

api "github.com/cobaltcore-dev/cortex/api/external/cinder"
"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"

"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
Expand Down Expand Up @@ -46,6 +47,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) {
},
Weights: map[string]float64{"cinder-volume-1": 1.0, "cinder-volume-2": 0.5},
Pipeline: "test-pipeline",
Options: scheduling.Options{SkipHistory: true},
}

cinderRaw, err := json.Marshal(cinderRequest)
Expand Down Expand Up @@ -281,7 +283,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
Spec: v1alpha1.PipelineSpec{
Type: v1alpha1.PipelineTypeFilterWeigher,
SchedulingDomain: v1alpha1.SchedulingDomainCinder,
CreateHistory: true,
Filters: []v1alpha1.FilterSpec{},
Weighers: []v1alpha1.WeigherSpec{},
},
Expand Down Expand Up @@ -315,7 +316,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
Spec: v1alpha1.PipelineSpec{
Type: v1alpha1.PipelineTypeFilterWeigher,
SchedulingDomain: v1alpha1.SchedulingDomainCinder,
CreateHistory: false,
Filters: []v1alpha1.FilterSpec{},
Weighers: []v1alpha1.WeigherSpec{},
},
Expand Down Expand Up @@ -369,14 +369,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
Spec: v1alpha1.PipelineSpec{
Type: v1alpha1.PipelineTypeFilterWeigher,
SchedulingDomain: v1alpha1.SchedulingDomainCinder,
CreateHistory: true,
Filters: []v1alpha1.FilterSpec{},
Weighers: []v1alpha1.WeigherSpec{},
},
},
createHistory: true,
expectError: true,
expectHistoryCreated: true,
expectHistoryCreated: false,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert the no-history expectation on error paths.

expectHistoryCreated: false for the nil-raw case is not currently enforced, because the test only checks for zero History objects in the !tt.expectError branch. If ProcessNewDecisionFromAPI() accidentally creates history before returning the error, this case will still pass.

Also applies to: 434-450

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go` at
line 378, The test currently only asserts zero History when !tt.expectError, so
add an explicit assertion of history count == 0 whenever tt.expectHistoryCreated
is false (regardless of tt.expectError) after calling
ProcessNewDecisionFromAPI(); i.e. move or duplicate the expectHistoryCreated
check out of the !tt.expectError branch so the test fails if
ProcessNewDecisionFromAPI() creates History even when it returns an error. Apply
the same change for the other case group around the lines that correspond to the
second nil-raw check (the block covering the additional assertions referenced in
the comment).

expectResult: false,
},
}
Expand Down Expand Up @@ -413,6 +412,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline
}

if tt.decision.Spec.CinderRaw != nil {
req := cinderRequest
req.Options = scheduling.Options{SkipHistory: !tt.createHistory}
raw, marshalErr := json.Marshal(req)
if marshalErr != nil {
t.Fatalf("Failed to marshal request with options: %v", marshalErr)
}
tt.decision.Spec.CinderRaw = &runtime.RawExtension{Raw: raw}
}

err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision)

if tt.expectError && err == nil {
Expand Down
20 changes: 20 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface {
// Run the scheduling pipeline with the given request.
// Call-time options are read from request.GetOptions().
Run(request RequestType) (v1alpha1.DecisionResult, error)
}

Expand Down Expand Up @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri

// Evaluate the pipeline and return a list of hosts in order of preference.
func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) {
opts := request.GetOptions()
if err := opts.Validate(); err != nil {
return v1alpha1.DecisionResult{}, err
}
slogArgs := request.GetTraceLogArgs()
slogArgsAny := make([]any, 0, len(slogArgs))
for _, arg := range slogArgs {
Expand Down Expand Up @@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.
hosts := p.sortHostsByWeights(outWeights)
traceLog.Info("scheduler: sorted hosts", "hosts", hosts)

if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates {
traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts))
hosts = hosts[:opts.MaxCandidates]
// Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent.
kept := make(map[string]struct{}, len(hosts))
for _, h := range hosts {
kept[h] = struct{}{}
}
for host := range outWeights {
if _, ok := kept[host]; !ok {
delete(outWeights, host)
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide logging here so we see what's going on.

// Collect some metrics about the pipeline execution.
go p.monitor.observePipelineResult(request, hosts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package lib

import "log/slog"
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
)

type FilterWeigherPipelineRequest interface {
// Get the hosts that went in the pipeline.
Expand All @@ -21,4 +25,6 @@ type FilterWeigherPipelineRequest interface {
// Get logging args to be used in the step's trace log.
// Usually, this will be the request context including the request ID.
GetTraceLogArgs() []slog.Attr
// Get the call-time options for this pipeline run.
GetOptions() scheduling.Options
}
Loading
Loading