diff --git a/engine/scheduler.go b/engine/scheduler.go index f35468285..92a3da60e 100644 --- a/engine/scheduler.go +++ b/engine/scheduler.go @@ -32,6 +32,16 @@ type Scheduler interface { type leastLoadedScheduler struct{} +func computeAgentScore(a *agent.AgentState) float64 { + weightSum := 0 + + for _, u := range a.Units { + weightSum += u.Unit.Weight() + } + + return float64(weightSum) / float64(a.MState.Shares()) +} + func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decision, error) { agents := lls.sortedAgents(clust) @@ -62,7 +72,7 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis } // sortedAgents returns a list of AgentState objects sorted ascending -// by the number of scheduled units +// by the computed score func (lls *leastLoadedScheduler) sortedAgents(clust *clusterState) []*agent.AgentState { agents := clust.agents() @@ -81,7 +91,16 @@ func (sas sortableAgentStates) Len() int { return len(sas) } func (sas sortableAgentStates) Swap(i, j int) { sas[i], sas[j] = sas[j], sas[i] } func (sas sortableAgentStates) Less(i, j int) bool { - niUnits := len(sas[i].Units) - njUnits := len(sas[j].Units) - return niUnits < njUnits || (niUnits == njUnits && sas[i].MState.ID < sas[j].MState.ID) + niScore := computeAgentScore(sas[i]) + njScore := computeAgentScore(sas[j]) + + if niScore != njScore { + return niScore < njScore + } + + if sas[i].MState.Shares() != sas[j].MState.Shares() { + return sas[i].MState.Shares() > sas[j].MState.Shares() + } + + return sas[i].MState.ID < sas[j].MState.ID } diff --git a/engine/scheduler_test.go b/engine/scheduler_test.go index 92f7e9f83..97457d42b 100644 --- a/engine/scheduler_test.go +++ b/engine/scheduler_test.go @@ -17,11 +17,13 @@ package engine import ( "reflect" "sort" + "strconv" "testing" "github.com/coreos/fleet/agent" "github.com/coreos/fleet/job" "github.com/coreos/fleet/machine" + "github.com/coreos/fleet/unit" ) func TestSchedulerDecisions(t *testing.T) { @@ -65,7 +67,22 @@ func TestSchedulerDecisions(t *testing.T) { } } +func buildUnitWithWeight(shares int) *job.Unit { + return &job.Unit{ + Unit: unit.UnitFile{ + Contents: map[string]map[string][]string{ + "X-Fleet": map[string][]string{ + "Weight": []string{strconv.Itoa(shares)}, + }, + }, + }, + } +} + func TestAgentStateSorting(t *testing.T) { + Unit6th := buildUnitWithWeight(10) + Unit7th := buildUnitWithWeight(20) + tests := []struct { in []*agent.AgentState out []*agent.AgentState @@ -117,6 +134,96 @@ func TestAgentStateSorting(t *testing.T) { }, }, + // sort by weigth of jobs scheduled to agent + { + in: []*agent.AgentState{ + &agent.AgentState{ + MState: &machine.MachineState{ID: "A"}, + Units: map[string]*job.Unit{ + "1.service": &job.Unit{}, + "2.service": &job.Unit{}, + "3.service": &job.Unit{}, + "4.service": &job.Unit{}, + "5.service": &job.Unit{}, + }, + }, + &agent.AgentState{ + MState: &machine.MachineState{ID: "B"}, + Units: map[string]*job.Unit{ + "6.service": Unit6th, + "7.service": Unit7th, + }, + }, + }, + out: []*agent.AgentState{ + &agent.AgentState{ + MState: &machine.MachineState{ID: "AAA"}, + Units: map[string]*job.Unit{ + "1.service": &job.Unit{}, + "2.service": &job.Unit{}, + "3.service": &job.Unit{}, + "4.service": &job.Unit{}, + "5.service": &job.Unit{}, + }, + }, + &agent.AgentState{ + MState: &machine.MachineState{ID: "BBB"}, + Units: map[string]*job.Unit{ + "6.service": Unit6th, + "7.service": Unit7th, + }, + }, + }, + }, + + // fall back to sorting by shares when # jobs is equal + { + in: []*agent.AgentState{ + &agent.AgentState{ + MState: &machine.MachineState{ + ID: "B", + Metadata: map[string]string{"shares": "2"}, + }, + Units: map[string]*job.Unit{ + "1.service": &job.Unit{}, + "2.service": &job.Unit{}, + }, + }, + &agent.AgentState{ + MState: &machine.MachineState{ + ID: "A", + Metadata: map[string]string{"shares": "3"}, + }, + Units: map[string]*job.Unit{ + "3.service": &job.Unit{}, + "4.service": &job.Unit{}, + }, + }, + }, + out: []*agent.AgentState{ + &agent.AgentState{ + MState: &machine.MachineState{ + ID: "A", + Metadata: map[string]string{"shares": "3"}, + }, + Units: map[string]*job.Unit{ + "3.service": &job.Unit{}, + "4.service": &job.Unit{}, + }, + }, + &agent.AgentState{ + MState: &machine.MachineState{ + ID: "B", + Metadata: map[string]string{"shares": "2"}, + }, + Units: map[string]*job.Unit{ + "1.service": &job.Unit{}, + "2.service": &job.Unit{}, + }, + }, + }, + }, + // fall back to sorting alphabetically by machine ID when # jobs is equal { in: []*agent.AgentState{ diff --git a/job/job.go b/job/job.go index 82809a256..d7e6d99f6 100644 --- a/job/job.go +++ b/job/job.go @@ -46,6 +46,8 @@ const ( fleetMachineMetadata = "MachineMetadata" // Require that the unit be scheduled on every machine in the cluster fleetGlobal = "Global" + // Weight of the instance + fleetWeight = "Weight" deprecatedXPrefix = "X-" deprecatedXConditionPrefix = "X-Condition" @@ -63,6 +65,7 @@ var validRequirements = pkg.NewUnsafeSet( deprecatedXConditionPrefix+fleetMachineMetadata, fleetMachineMetadata, fleetGlobal, + fleetWeight, ) func ParseJobState(s string) (JobState, error) { diff --git a/machine/state.go b/machine/state.go index 821f07524..8ae7bda3d 100644 --- a/machine/state.go +++ b/machine/state.go @@ -14,8 +14,14 @@ package machine +import "strconv" + const ( + sharesMetadataName = "shares" + shortIDLen = 8 + + defaultShares = 1024 ) // MachineState represents a point-in-time snapshot of the @@ -27,6 +33,16 @@ type MachineState struct { Version string } +func (ms MachineState) Shares() int { + if shares, ok := ms.Metadata[sharesMetadataName]; ok { + if v, err := strconv.Atoi(shares); err == nil { + return v + } + } + + return defaultShares +} + func (ms MachineState) ShortID() string { if len(ms.ID) <= shortIDLen { return ms.ID diff --git a/unit/unit.go b/unit/unit.go index 5a9c82d3a..706b39203 100644 --- a/unit/unit.go +++ b/unit/unit.go @@ -20,11 +20,14 @@ import ( "encoding/hex" "fmt" "io/ioutil" + "strconv" "strings" "github.com/coreos/go-systemd/unit" ) +const defaultWeight = 1 + func NewUnitFile(raw string) (*UnitFile, error) { reader := strings.NewReader(raw) opts, err := unit.Deserialize(reader) @@ -113,6 +116,21 @@ type UnitFile struct { Options []*unit.UnitOption } +// Weight returns the weight of the Job +func (u *UnitFile) Weight() int { + weightStrings := u.Contents["X-Fleet"]["Weight"] + + if len(weightStrings) == 0 { + return defaultWeight + } + + if w, err := strconv.Atoi(weightStrings[0]); err != nil { + return defaultWeight + } else { + return w + } +} + // Description returns the first Description option found in the [Unit] section. // If the option is not defined, an empty string is returned. func (u *UnitFile) Description() string {