-
Notifications
You must be signed in to change notification settings - Fork 6
feat: adding pipeline options #799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
36b7d94
ad66112
4a7bc9e
d298e75
54639d1
c2d0fc2
4de31f9
c2f0b56
7a014b2
cc9b6e6
4886d37
3b8cf87
b6a6139
037f74c
811a1c3
91dfe3e
f84ba71
5c6ecae
2871d18
4ee2c6a
532200d
8b2ffa0
b535ec0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,8 +18,8 @@ import ( | |
| ) | ||
|
|
||
| type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface { | ||
| // Run the scheduling pipeline with the given request. | ||
| Run(request RequestType) (v1alpha1.DecisionResult, error) | ||
| // Run the scheduling pipeline with the given request and call-time options. | ||
| Run(request RequestType, opts Options) (v1alpha1.DecisionResult, error) | ||
| } | ||
|
|
||
| // Pipeline of scheduler steps. | ||
|
|
@@ -138,14 +138,15 @@ func InitNewFilterWeigherPipeline[RequestType FilterWeigherPipelineRequest]( | |
| func (p *filterWeigherPipeline[RequestType]) runFilters( | ||
| log *slog.Logger, | ||
| request RequestType, | ||
| opts Options, | ||
| ) (filteredRequest RequestType, stepResults []v1alpha1.StepResult) { | ||
|
|
||
| filteredRequest = request | ||
| for _, filterName := range p.filtersOrder { | ||
| filter := p.filters[filterName] | ||
| stepLog := log.With("filter", filterName) | ||
| stepLog.Info("scheduler: running filter") | ||
| result, err := filter.Run(stepLog, filteredRequest) | ||
| result, err := filter.Run(stepLog, filteredRequest, opts) | ||
| if errors.Is(err, ErrStepSkipped) { | ||
| stepLog.Info("scheduler: filter skipped") | ||
| continue | ||
|
|
@@ -170,6 +171,7 @@ func (p *filterWeigherPipeline[RequestType]) runFilters( | |
| func (p *filterWeigherPipeline[RequestType]) runWeighers( | ||
| log *slog.Logger, | ||
| filteredRequest RequestType, | ||
| opts Options, | ||
| ) map[string]map[string]float64 { | ||
|
|
||
| activationsByStep := map[string]map[string]float64{} | ||
|
|
@@ -181,7 +183,7 @@ func (p *filterWeigherPipeline[RequestType]) runWeighers( | |
| wg.Go(func() { | ||
| stepLog := log.With("weigher", weigherName) | ||
| stepLog.Info("scheduler: running weigher") | ||
| result, err := weigher.Run(stepLog, filteredRequest) | ||
| result, err := weigher.Run(stepLog, filteredRequest, opts) | ||
| if errors.Is(err, ErrStepSkipped) { | ||
| stepLog.Info("scheduler: weigher skipped") | ||
| return | ||
|
|
@@ -262,7 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri | |
| } | ||
|
|
||
| // Evaluate the pipeline and return a list of hosts in order of preference. | ||
| func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) { | ||
| func (p *filterWeigherPipeline[RequestType]) Run(request RequestType, opts Options) (v1alpha1.DecisionResult, error) { | ||
| if err := opts.Validate(); err != nil { | ||
| return v1alpha1.DecisionResult{}, err | ||
| } | ||
| slogArgs := request.GetTraceLogArgs() | ||
| slogArgsAny := make([]any, 0, len(slogArgs)) | ||
| for _, arg := range slogArgs { | ||
|
|
@@ -279,7 +284,7 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1. | |
|
|
||
| // Run filters first to reduce the number of hosts. | ||
| // Any weights assigned to filtered out hosts are ignored. | ||
| filteredRequest, filterStepResults := p.runFilters(traceLog, request) | ||
| filteredRequest, filterStepResults := p.runFilters(traceLog, request, opts) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this change. Scheduler steps can get the options from the provided request. |
||
| traceLog.Info( | ||
| "scheduler: finished filters", | ||
| "remainingHosts", filteredRequest.GetHosts(), | ||
|
|
@@ -290,13 +295,23 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1. | |
| for _, host := range filteredRequest.GetHosts() { | ||
| remainingWeights[host] = inWeights[host] | ||
| } | ||
| stepWeights := p.runWeighers(traceLog, filteredRequest) | ||
| stepWeights := p.runWeighers(traceLog, filteredRequest, opts) | ||
| outWeights := p.applyWeights(traceLog, stepWeights, remainingWeights) | ||
| traceLog.Info("scheduler: output weights", "weights", outWeights) | ||
|
|
||
| hosts := p.sortHostsByWeights(outWeights) | ||
| traceLog.Info("scheduler: sorted hosts", "hosts", hosts) | ||
|
|
||
| if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates { | ||
| hosts = hosts[:opts.MaxCandidates] | ||
| // Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent. | ||
| for host := range outWeights { | ||
| if !slices.Contains(hosts, host) { | ||
| delete(outWeights, host) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please provide logging here so we see what's going on. |
||
| // Collect some metrics about the pipeline execution. | ||
| go p.monitor.observePipelineResult(request, hosts) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,47 @@ | ||||||||||||||||||||||||||
| // Copyright SAP SE | ||||||||||||||||||||||||||
| // SPDX-License-Identifier: Apache-2.0 | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| package lib | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| "github.com/cobaltcore-dev/cortex/api/v1alpha1" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Options configure the behavior of a single pipeline run at call time. | ||||||||||||||||||||||||||
| // These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts), | ||||||||||||||||||||||||||
| // which are static and set when the pipeline is initialized. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Consumed by steps: ReadOnly, LockReservations, AssumeEmptyHosts, IgnoredReservationTypes. | ||||||||||||||||||||||||||
| // Consumed by the controller after pipeline.Run(): RecordHistory, CreateInflight. | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two code comment lines are likely to be obsolete once the controller or step logic changes. We should consider removing them. |
||||||||||||||||||||||||||
| type Options struct { | ||||||||||||||||||||||||||
| // ReadOnly means the pipeline could run without using the mutex, i.e. concurrent runs are ok. | ||||||||||||||||||||||||||
| ReadOnly bool | ||||||||||||||||||||||||||
| // LockReservations prevents reservation unlocking, e.g. in the capacity filter. | ||||||||||||||||||||||||||
| // Set when finding hosts for new reservations (failover, CR) to see true available capacity. | ||||||||||||||||||||||||||
| LockReservations bool | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be more generic such as
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I get this comment, probably better to discuss live |
||||||||||||||||||||||||||
| // AssumeEmptyHosts treats all hosts as having no running VMs. | ||||||||||||||||||||||||||
| AssumeEmptyHosts bool | ||||||||||||||||||||||||||
| // IgnoredReservationTypes lists reservation types the capacity filter skips entirely. | ||||||||||||||||||||||||||
| IgnoredReservationTypes []v1alpha1.ReservationType | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we make this a substruct such as |
||||||||||||||||||||||||||
| // MaxCandidates limits the number of hosts returned after weighing. 0 means no limit. | ||||||||||||||||||||||||||
| MaxCandidates int | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // RecordHistory records the placement decision in placement history. | ||||||||||||||||||||||||||
| // Replaces pipeline.Spec.CreateHistory once pipelines consolidate. | ||||||||||||||||||||||||||
| RecordHistory bool | ||||||||||||||||||||||||||
| // CreateInflight creates pessimistic blocking reservations for all returned candidates. | ||||||||||||||||||||||||||
| CreateInflight bool | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Validate checks for mutually exclusive or inconsistent option combinations. | ||||||||||||||||||||||||||
| func (o Options) Validate() error { | ||||||||||||||||||||||||||
| if o.ReadOnly && o.RecordHistory { | ||||||||||||||||||||||||||
| return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not mutate state") | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if o.ReadOnly && o.CreateInflight { | ||||||||||||||||||||||||||
| return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not mutate state") | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use lowercase error messages to satisfy linting At Line [41] and Line [44], error strings start with uppercase words. This can fail lint checks in this repo. 🔧 Suggested patch- return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not mutate state")
+ return errors.New("readonly and record history are mutually exclusive: read-only runs must not mutate state")
}
if o.ReadOnly && o.CreateInflight {
- return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not mutate state")
+ return errors.New("readonly and create inflight are mutually exclusive: read-only runs must not mutate state")
}As per coding guidelines: "Error messages should always be lowercase to conform to linting rules". 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -144,7 +144,7 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision | |
|
|
||
| // Execute the scheduling pipeline. | ||
| request := ironcore.MachinePipelineRequest{Pools: pools.Items} | ||
| result, err := pipeline.Run(request) | ||
| result, err := pipeline.Run(request, lib.Options{}) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Unlike the Nova controller — which calls
💡 Suggested approach+func (c *FilterWeigherPipelineController) buildOptions(pipelineConf v1alpha1.Pipeline) lib.Options {
+ return lib.Options{
+ RecordHistory: pipelineConf.Spec.CreateHistory,
+ }
+}
func (c *FilterWeigherPipelineController) process(ctx context.Context, decision *v1alpha1.Decision) error {
...
pipeline, ok := c.Pipelines[decision.Spec.PipelineRef.Name]
if !ok { ... }
+ pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name]
+ if !ok {
+ return errors.New("pipeline config not found")
+ }
...
- result, err := pipeline.Run(request, lib.Options{})
+ result, err := pipeline.Run(request, c.buildOptions(pipelineConf))🤖 Prompt for AI Agents |
||
| if err != nil { | ||
| log.V(1).Error(err, "failed to run scheduler pipeline") | ||
| return errors.New("failed to run scheduler pipeline") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request RequestTypeis constrained bytype FilterWeigherPipelineRequest interfacewhich now carries the newGetOptions()method. So we should remove all additionalopts Optionsparameters.