Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/external/cinder/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
3 changes: 3 additions & 0 deletions api/external/ironcore/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
type MachinePipelineRequest struct {
// The available machine pools.
Pools []ironcorev1alpha1.MachinePool `json:"pools"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r MachinePipelineRequest) GetOptions() lib.Options { return r.Options }
func (r MachinePipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Pools))
for i, host := range r.Pools {
Expand Down
3 changes: 3 additions & 0 deletions api/external/manila/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
6 changes: 6 additions & 0 deletions api/external/nova/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ type ExternalSchedulerRequest struct {

// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`

// Options configure the pipeline behavior for this scheduling call.
// Set by the caller (CR controller, failover controller, Nova).
// Nova does not set these; Cortex fills in config-derived defaults server-side.
Options lib.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options }
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
3 changes: 3 additions & 0 deletions api/external/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ type PodPipelineRequest struct {
Nodes []corev1.Node `json:"nodes"`
// The pod to be scheduled.
Pod corev1.Pod `json:"pod"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r PodPipelineRequest) GetOptions() lib.Options { return r.Options }
func (r PodPipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Nodes))
for i, host := range r.Nodes {
Expand Down
20 changes: 20 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface {
// Run the scheduling pipeline with the given request.
// Call-time options are read from request.GetOptions().
Run(request RequestType) (v1alpha1.DecisionResult, error)
}

Expand Down Expand Up @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri

// Evaluate the pipeline and return a list of hosts in order of preference.
func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) {
opts := request.GetOptions()
if err := opts.Validate(); err != nil {
return v1alpha1.DecisionResult{}, err
}
slogArgs := request.GetTraceLogArgs()
slogArgsAny := make([]any, 0, len(slogArgs))
for _, arg := range slogArgs {
Expand Down Expand Up @@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.
hosts := p.sortHostsByWeights(outWeights)
traceLog.Info("scheduler: sorted hosts", "hosts", hosts)

if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates {
traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts))
hosts = hosts[:opts.MaxCandidates]
// Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent.
kept := make(map[string]struct{}, len(hosts))
for _, h := range hosts {
kept[h] = struct{}{}
}
for host := range outWeights {
if _, ok := kept[host]; !ok {
delete(outWeights, host)
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide logging here so we see what's going on.

// Collect some metrics about the pipeline execution.
go p.monitor.observePipelineResult(request, hosts)

Expand Down
2 changes: 2 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ type FilterWeigherPipelineRequest interface {
// Get logging args to be used in the step's trace log.
// Usually, this will be the request context including the request ID.
GetTraceLogArgs() []slog.Attr
// Get the call-time options for this pipeline run.
GetOptions() Options
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ type mockFilterWeigherPipelineRequest struct {
Hosts []string
Weights map[string]float64
Pipeline string
Options Options
}

func (m mockFilterWeigherPipelineRequest) GetWeightKeys() []string { return m.WeightKeys }
func (m mockFilterWeigherPipelineRequest) GetTraceLogArgs() []slog.Attr { return m.TraceLogArgs }
func (m mockFilterWeigherPipelineRequest) GetHosts() []string { return m.Hosts }
func (m mockFilterWeigherPipelineRequest) GetWeights() map[string]float64 { return m.Weights }
func (m mockFilterWeigherPipelineRequest) GetPipeline() string { return m.Pipeline }
func (m mockFilterWeigherPipelineRequest) GetOptions() Options { return m.Options }

func (m mockFilterWeigherPipelineRequest) Filter(hosts map[string]float64) FilterWeigherPipelineRequest {
filteredHosts := make([]string, 0, len(hosts))
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type FilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest] interfa
//
// A traceLog is provided that contains the global request id and should
// be used to log the step's execution.
//
// Per-call options are available via request.GetOptions().
Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)
}

Expand Down
52 changes: 52 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"log/slog"
"math"
"slices"
"testing"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
Expand Down Expand Up @@ -372,3 +373,54 @@ func TestFilterWeigherPipelineMonitor_SubPipeline(t *testing.T) {
t.Error("original monitor should not be modified")
}
}

func TestPipeline_MaxCandidates(t *testing.T) {
// Pipeline that passes all 4 hosts with descending weights.
pipeline := &filterWeigherPipeline[mockFilterWeigherPipelineRequest]{
filters: map[string]Filter[mockFilterWeigherPipelineRequest]{},
filtersOrder: []string{},
weighersOrder: []string{},
weighers: map[string]Weigher[mockFilterWeigherPipelineRequest]{},
}
request := mockFilterWeigherPipelineRequest{
Hosts: []string{"host1", "host2", "host3", "host4"},
Weights: map[string]float64{"host1": 4.0, "host2": 3.0, "host3": 2.0, "host4": 1.0},
}

tests := []struct {
name string
maxCandidates int
wantLen int
wantFirst string
}{
{"no limit", 0, 4, "host1"},
{"limit to 2", 2, 2, "host1"},
{"limit to 1", 1, 1, "host1"},
{"limit larger than hosts", 10, 4, "host1"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := request
req.Options = Options{MaxCandidates: tt.maxCandidates}
result, err := pipeline.Run(req)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(result.OrderedHosts) != tt.wantLen {
t.Errorf("expected %d hosts, got %d: %v", tt.wantLen, len(result.OrderedHosts), result.OrderedHosts)
}
if len(result.OrderedHosts) > 0 && result.OrderedHosts[0] != tt.wantFirst {
t.Errorf("expected first host %s, got %s", tt.wantFirst, result.OrderedHosts[0])
}
if tt.maxCandidates > 0 && len(result.OrderedHosts) <= tt.maxCandidates {
// AggregatedOutWeights must only contain returned hosts.
for host := range result.AggregatedOutWeights {
if !slices.Contains(result.OrderedHosts, host) {
t.Errorf("AggregatedOutWeights contains trimmed host %s", host)
}
}
}
})
}
}
46 changes: 46 additions & 0 deletions internal/scheduling/lib/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package lib

import (
"errors"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
)

// Options configure the behavior of a single pipeline run at call time.
// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts),
// which are static and set when the pipeline is initialized.
type Options struct {
// ReadOnly means the pipeline run does not modify shared scheduling state (reservations,
// history, inflight records). Concurrent read-only runs are safe under a shared read lock.
// Note: the controller may still write the Decision status after Run() regardless of this flag.
ReadOnly bool
// LockReservations prevents reservation unlocking, e.g. in the capacity filter.
// Set when finding hosts for new reservations (failover, CR) to see true available capacity.
LockReservations bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be more generic such as Kind which is a typed enum? In this case, the capacity filter would just check for req.GetOptions().Kind == KindFailoverReservation to control which logic is executed. We could also add a kind KindCapacityScan for limes etc. -- this is nicely extensible and well-defined. In this case, the ReadOnly, AssumeEmptyHosts, and CreateInFlight flags could also be removed.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get this comment, probably better to discuss live

// AssumeEmptyHosts treats all hosts as having no running VMs.
AssumeEmptyHosts bool
// IgnoredReservationTypes lists reservation types the capacity filter skips entirely.
IgnoredReservationTypes []v1alpha1.ReservationType
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a substruct such as type ReservationOptions struct?

// MaxCandidates limits the number of hosts returned after weighing. 0 means no limit.
MaxCandidates int

// RecordHistory records the placement decision in placement history.
// Replaces pipeline.Spec.CreateHistory once pipelines consolidate.
RecordHistory bool
// CreateInflight creates pessimistic blocking reservations for all returned candidates.
CreateInflight bool
}

// Validate checks for mutually exclusive or inconsistent option combinations.
func (o Options) Validate() error {
if o.ReadOnly && o.RecordHistory {
return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not write scheduling history")
}
if o.ReadOnly && o.CreateInflight {
return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations")
}
return nil
}
34 changes: 34 additions & 0 deletions internal/scheduling/lib/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package lib

import "testing"

func TestOptions_Validate(t *testing.T) {
tests := []struct {
name string
opts Options
wantErr bool
}{
{"zero value is valid", Options{}, false},
{"write run with history", Options{RecordHistory: true}, false},
{"write run with inflight", Options{CreateInflight: true}, false},
{"read-only run, no side effects", Options{ReadOnly: true}, false},
{"ReadOnly + RecordHistory is invalid", Options{ReadOnly: true, RecordHistory: true}, true},
{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
{"ReadOnly + both invalid", Options{ReadOnly: true, RecordHistory: true, CreateInflight: true}, true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.opts.Validate()
if tt.wantErr && err == nil {
t.Error("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Errorf("expected no error, got %v", err)
}
})
}
}
61 changes: 45 additions & 16 deletions internal/scheduling/nova/filter_weigher_pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -38,8 +37,9 @@ type FilterWeigherPipelineController struct {
// Toolbox shared between all pipeline controllers.
lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]]

// Mutex to only allow one process at a time
processMu sync.Mutex
// Mutex to only allow one process at a time.
// Read-only runs (opts.ReadOnly == true) acquire a read lock; write runs acquire the full lock.
processMu sync.RWMutex
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Monitor to pass down to all pipelines.
Monitor lib.FilterWeigherPipelineMonitor
Expand All @@ -54,13 +54,23 @@ func (c *FilterWeigherPipelineController) PipelineType() v1alpha1.PipelineType {

// Callback executed when kubernetes asks to reconcile a decision resource.
func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
c.processMu.Lock()
defer c.processMu.Unlock()

// Peek at the decision before acquiring the lock so we can choose the right lock type.
// Read-only runs can proceed concurrently; write runs need the exclusive lock.
decision := &v1alpha1.Decision{}
if err := c.Get(ctx, req.NamespacedName, decision); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if c.peekReadOnly(decision) {
c.processMu.RLock()
defer c.processMu.RUnlock()
} else {
c.processMu.Lock()
defer c.processMu.Unlock()
// Re-fetch after acquiring the exclusive lock to see consistent state.
if err := c.Get(ctx, req.NamespacedName, decision); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
old := decision.DeepCopy()
if err := c.process(ctx, decision); err != nil {
return ctrl.Result{}, err
Expand All @@ -74,13 +84,16 @@ func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctr

// Process the decision from the API. Should create and return the updated decision.
func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.Context, decision *v1alpha1.Decision) error {
c.processMu.Lock()
defer c.processMu.Unlock()

pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name]
if !ok {
return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name)
// Read-only runs share the cached decision state; no re-fetch needed because they
// don't observe writes from concurrent exclusive-lock runs.
if c.peekReadOnly(decision) {
c.processMu.RLock()
defer c.processMu.RUnlock()
} else {
c.processMu.Lock()
defer c.processMu.Unlock()
}

err := c.process(ctx, decision)
if err != nil {
meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{
Expand All @@ -97,9 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.
Message: "pipeline run succeeded",
})
}
if pipelineConf.Spec.CreateHistory {
c.upsertHistory(ctx, decision, err)
}
return err
}

Expand Down Expand Up @@ -166,7 +176,14 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision
log.Info("gathered all placement candidates", "numHosts", len(request.Hosts))
}

// Fill RecordHistory from config if the caller didn't set it.
if !request.Options.RecordHistory {
request.Options.RecordHistory = pipelineConf.Spec.CreateHistory
}
result, err := pipeline.Run(request)
if request.Options.RecordHistory {
c.upsertHistory(ctx, decision, err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Comment on lines 179 to +182
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Read-only requests can still write history under RLock().

The lock choice now depends on request.Options.ReadOnly, but history writes are only guarded by SkipHistory. A read-only caller that omits SkipHistory: true will still mutate History CRs while running on the shared-lock path.

Suggested guard
 	result, err := pipeline.Run(request)
-	if !request.Options.SkipHistory {
+	if !request.Options.ReadOnly && !request.Options.SkipHistory {
 		c.upsertHistory(ctx, decision, err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result, err := pipeline.Run(request)
if !request.Options.SkipHistory {
c.upsertHistory(ctx, decision, err)
}
result, err := pipeline.Run(request)
if !request.Options.ReadOnly && !request.Options.SkipHistory {
c.upsertHistory(ctx, decision, err)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/scheduling/nova/filter_weigher_pipeline_controller.go` around lines
179 - 182, The code calls c.upsertHistory after pipeline.Run guarded only by
request.Options.SkipHistory but not by request.Options.ReadOnly, so a read-only
request can still mutate History; change the guard so history is only written
when both SkipHistory is false AND ReadOnly is false (i.e. if
!request.Options.SkipHistory && !request.Options.ReadOnly), and ensure the call
to c.upsertHistory happens on the write-lock path (the branch that acquired the
exclusive Lock, not the RLock) so history writes are performed only under the
write lock; update the logic around pipeline.Run / lock handling to move or
conditionalize the c.upsertHistory call accordingly.

if err != nil {
log.Error(err, "failed to run pipeline")
return err
Expand All @@ -182,7 +199,19 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision
return nil
}

// The base controller will delegate the pipeline creation down to this method.
// peekReadOnly determines whether a decision should use a read lock instead of
// the exclusive write lock. Defaults to false (exclusive) on any parse error.
func (c *FilterWeigherPipelineController) peekReadOnly(decision *v1alpha1.Decision) bool {
if decision.Spec.NovaRaw == nil {
return false
}
var request api.ExternalSchedulerRequest
if err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request); err != nil {
return false
}
return request.Options.ReadOnly
}

func (c *FilterWeigherPipelineController) InitPipeline(
ctx context.Context,
p v1alpha1.Pipeline,
Expand Down
Loading
Loading