Skip to content
101 changes: 79 additions & 22 deletions pkg/client/compose/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ import (
"github.com/psviderski/uncloud/pkg/client/deploy"
"github.com/psviderski/uncloud/pkg/client/deploy/operation"
"github.com/psviderski/uncloud/pkg/client/deploy/scheduler"
"golang.org/x/sync/errgroup"
)

type Client interface {
api.DNSClient
deploy.Client
ListServices(ctx context.Context) ([]api.Service, error)
}

type Deployment struct {
Client Client
Project *types.Project
SpecResolver *deploy.ServiceSpecResolver
Strategy deploy.Strategy
state *scheduler.ClusterState
planning *planningState
plan *Plan
}

Expand All @@ -36,29 +37,82 @@ func NewDeployment(ctx context.Context, cli Client, project *types.Project) (*De
}

func NewDeploymentWithStrategy(ctx context.Context, cli Client, project *types.Project, strategy deploy.Strategy) (*Deployment, error) {
state, err := scheduler.InspectClusterState(ctx, cli)
planning, err := loadPlanningState(ctx, cli)
if err != nil {
return nil, fmt.Errorf("inspect cluster state: %w", err)
}

domain, err := cli.GetDomain(ctx)
if err != nil && !errors.Is(err, api.ErrNotFound) {
return nil, fmt.Errorf("get cluster domain: %w", err)
}
resolver := &deploy.ServiceSpecResolver{
// If the domain is not found (not reserved), an empty domain is used for the resolver.
ClusterDomain: domain,
return nil, fmt.Errorf("load planning state: %w", err)
}

return &Deployment{
Client: cli,
Project: project,
SpecResolver: resolver,
SpecResolver: planning.resolver,
Strategy: strategy,
state: state,
planning: planning,
}, nil
}

type planningState struct {
clusterState *scheduler.ClusterState
resolver *deploy.ServiceSpecResolver
services map[string][]api.Service
}

func loadPlanningState(ctx context.Context, cli Client) (*planningState, error) {
state := &planningState{}
g, gctx := errgroup.WithContext(ctx)

g.Go(func() error {
clusterState, err := scheduler.InspectClusterState(gctx, cli)
if err != nil {
return fmt.Errorf("inspect cluster state: %w", err)
}
state.clusterState = clusterState
return nil
})

g.Go(func() error {
services, err := cli.ListServices(gctx)
if err != nil {
return fmt.Errorf("list services: %w", err)
}

state.services = make(map[string][]api.Service, len(services))
for _, svc := range services {
state.services[svc.Name] = append(state.services[svc.Name], svc)
}
return nil
})

g.Go(func() error {
domain, err := cli.GetDomain(gctx)
if err != nil && !errors.Is(err, api.ErrNotFound) {
return fmt.Errorf("get domain: %w", err)
}
state.resolver = &deploy.ServiceSpecResolver{
// If the domain is not found (not reserved), an empty domain is used for the resolver.
ClusterDomain: domain,
}
return nil
})

if err := g.Wait(); err != nil {
return nil, err
}
return state, nil
}

func (s *planningState) currentService(name string) (*api.Service, error) {
matches := s.services[name]
switch len(matches) {
case 0:
return nil, nil
case 1:
return &matches[0], nil
default:
return nil, fmt.Errorf("multiple services found with name '%s', use the service ID instead", name)
}
}

func (d *Deployment) Plan(ctx context.Context) (Plan, error) {
if d.plan != nil {
return *d.plan, nil
Expand All @@ -85,7 +139,7 @@ func (d *Deployment) Plan(ctx context.Context) (Plan, error) {
}

// Check external volumes and plan the creation of missing volumes before deploying services.
// Updates the cluster state (d.state) with the scheduled volumes.
// Updates the planning cluster state with the scheduled volumes.
volumeOps, err := d.planVolumes(serviceSpecs)
if err != nil {
return plan, err
Expand All @@ -94,9 +148,12 @@ func (d *Deployment) Plan(ctx context.Context) (Plan, error) {

for _, spec := range serviceSpecs {
// TODO: properly handle depends_on conditions in the service deployment plan as the first operation.
// Pass the updated cluster state with the scheduled volumes to the deployment.
deployment := deploy.NewDeploymentWithClusterState(d.Client, spec, d.Strategy, d.state)
servicePlan, err := deployment.Plan(ctx)
currentService, err := d.planning.currentService(spec.Name)
if err != nil {
return plan, fmt.Errorf("find current service '%s': %w", spec.Name, err)
}

servicePlan, err := deploy.PlanService(d.planning.clusterState, currentService, spec, d.SpecResolver, d.Strategy)
if err != nil {
return plan, fmt.Errorf("create deployment plan for service '%s': %w", spec.Name, err)
}
Expand Down Expand Up @@ -134,7 +191,7 @@ func (d *Deployment) planVolumes(serviceSpecs []api.ServiceSpec) ([]*operation.C

// TODO: The scheduler should ideally work with the resolved service specs to correctly identify eligible machines.
// Figure out where the best place to resolve the specs is.
volumeScheduler, err := scheduler.NewVolumeScheduler(d.state, serviceSpecs)
volumeScheduler, err := scheduler.NewVolumeScheduler(d.planning.clusterState, serviceSpecs)
if err != nil {
return nil, fmt.Errorf("init volume scheduler: %w", err)
}
Expand All @@ -148,7 +205,7 @@ func (d *Deployment) planVolumes(serviceSpecs []api.ServiceSpec) ([]*operation.C
for machineID, volumes := range scheduledVolumes {
for _, v := range volumes {
machineName := machineID
if m, ok := d.state.Machine(machineID); ok {
if m, ok := d.planning.clusterState.Machine(machineID); ok {
machineName = m.Info.Name
}

Expand All @@ -174,7 +231,7 @@ func (d *Deployment) checkExternalVolumesExist() error {

var notFound []string
for _, name := range externalNames {
if !slices.ContainsFunc(d.state.Machines, func(m *scheduler.Machine) bool {
if !slices.ContainsFunc(d.planning.clusterState.Machines, func(m *scheduler.Machine) bool {
return slices.ContainsFunc(m.Volumes, func(vol volume.Volume) bool {
return vol.Name == name
})
Expand Down
165 changes: 165 additions & 0 deletions pkg/client/compose/deploy_planning_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package compose

import (
"context"
"fmt"
"testing"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/volume"
"github.com/psviderski/uncloud/internal/machine/api/pb"
"github.com/psviderski/uncloud/pkg/api"
"github.com/psviderski/uncloud/pkg/client/deploy"
"github.com/psviderski/uncloud/pkg/client/deploy/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDeploymentPlanUsesPlanningStateForCurrentServices(t *testing.T) {
project, err := LoadProjectFromContent(context.Background(), composeYAML(20))
require.NoError(t, err)

strategy := &recordingStrategy{}
fake := &composePlanningClient{
services: []api.Service{
{ID: "svc-01", Name: "svc01", Mode: api.ServiceModeReplicated},
{ID: "svc-10", Name: "svc10", Mode: api.ServiceModeReplicated},
},
domain: "example.uncld.dev",
}

deployment, err := NewDeploymentWithStrategy(context.Background(), fake, project, strategy)
require.NoError(t, err)
_, err = deployment.Plan(context.Background())
require.NoError(t, err)

assert.Equal(t, 1, fake.listServicesCalls)
assert.Equal(t, 1, fake.getDomainCalls)
assert.Equal(t, 0, fake.inspectServiceCalls)
require.Len(t, strategy.calls, 20)

seen := make(map[string]*api.Service)
for _, call := range strategy.calls {
seen[call.spec.Name] = call.svc
}
require.NotNil(t, seen["svc01"])
assert.Equal(t, "svc-01", seen["svc01"].ID)
require.NotNil(t, seen["svc10"])
assert.Equal(t, "svc-10", seen["svc10"].ID)
assert.Nil(t, seen["svc02"])
}

func TestDeploymentPlanErrorsOnDuplicateCurrentServiceNames(t *testing.T) {
project, err := LoadProjectFromContent(context.Background(), composeYAML(1))
require.NoError(t, err)

fake := &composePlanningClient{
services: []api.Service{
{ID: "svc-a", Name: "svc01"},
{ID: "svc-b", Name: "svc01"},
},
}
deployment, err := NewDeploymentWithStrategy(context.Background(), fake, project, &recordingStrategy{})
require.NoError(t, err)

_, err = deployment.Plan(context.Background())
require.Error(t, err)
assert.Contains(t, err.Error(), "multiple services found with name 'svc01'")
assert.Equal(t, 0, fake.inspectServiceCalls)
}

func composeYAML(count int) string {
out := "services:\n"
for i := 1; i <= count; i++ {
name := fmt.Sprintf("svc%02d", i)
out += fmt.Sprintf(" %s:\n image: nginx:%d\n", name, i)
}
return out
}

type recordingStrategy struct {
calls []recordedPlanCall
}

type recordedPlanCall struct {
svc *api.Service
spec api.ServiceSpec
}

func (s *recordingStrategy) Type() string {
return "recording"
}

func (s *recordingStrategy) Plan(
_ *scheduler.ClusterState, svc *api.Service, spec api.ServiceSpec,
) (deploy.ServicePlan, error) {
s.calls = append(s.calls, recordedPlanCall{
svc: svc,
spec: spec,
})
serviceID := spec.Name
if svc != nil {
serviceID = svc.ID
}
return deploy.ServicePlan{
ServiceID: serviceID,
ServiceName: spec.Name,
Spec: spec,
}, nil
}

type composePlanningClient struct {
api.Client

services []api.Service
domain string

listServicesCalls int
getDomainCalls int
inspectServiceCalls int
}

func (c *composePlanningClient) ListServices(context.Context) ([]api.Service, error) {
c.listServicesCalls++
return c.services, nil
}

func (c *composePlanningClient) ListMachines(context.Context, *api.MachineFilter) (api.MachineMembersList, error) {
return api.MachineMembersList{
{
Machine: &pb.MachineInfo{
Id: "machine-1",
Name: "machine-1",
},
State: pb.MachineMember_UP,
},
}, nil
}

func (c *composePlanningClient) ListVolumes(context.Context, *api.VolumeFilter) ([]api.MachineVolume, error) {
return nil, nil
}

func (c *composePlanningClient) GetDomain(context.Context) (string, error) {
c.getDomainCalls++
return c.domain, nil
}

func (c *composePlanningClient) InspectService(context.Context, string) (api.Service, error) {
c.inspectServiceCalls++
return api.Service{}, api.ErrNotFound
}

func (c *composePlanningClient) CreateVolume(
context.Context, string, volume.CreateOptions,
) (api.MachineVolume, error) {
return api.MachineVolume{}, nil
}

func (c *composePlanningClient) RemoveVolume(context.Context, string, string, bool) error {
return nil
}

func (c *composePlanningClient) StopService(context.Context, string, container.StopOptions) error {
return nil
}
Loading
Loading