Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Implement a share-based scheduling heuristic #1583

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions engine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ type Scheduler interface {

type leastLoadedScheduler struct{}

func computeAgentScore(a *agent.AgentState) float64 {
weightSum := 0

for _, u := range a.Units {
weightSum += u.Unit.Weight()
}

return float64(weightSum) / float64(a.MState.Shares())
}

func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decision, error) {
agents := lls.sortedAgents(clust)

Expand Down Expand Up @@ -62,7 +72,7 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis
}

// sortedAgents returns a list of AgentState objects sorted ascending
// by the number of scheduled units
// by the computed score
func (lls *leastLoadedScheduler) sortedAgents(clust *clusterState) []*agent.AgentState {
agents := clust.agents()

Expand All @@ -81,7 +91,16 @@ func (sas sortableAgentStates) Len() int { return len(sas) }
func (sas sortableAgentStates) Swap(i, j int) { sas[i], sas[j] = sas[j], sas[i] }

func (sas sortableAgentStates) Less(i, j int) bool {
niUnits := len(sas[i].Units)
njUnits := len(sas[j].Units)
return niUnits < njUnits || (niUnits == njUnits && sas[i].MState.ID < sas[j].MState.ID)
niScore := computeAgentScore(sas[i])
njScore := computeAgentScore(sas[j])

if niScore != njScore {
return niScore < njScore
}

if sas[i].MState.Shares() != sas[j].MState.Shares() {
return sas[i].MState.Shares() > sas[j].MState.Shares()
}

return sas[i].MState.ID < sas[j].MState.ID
}
107 changes: 107 additions & 0 deletions engine/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package engine
import (
"reflect"
"sort"
"strconv"
"testing"

"github.com/coreos/fleet/agent"
"github.com/coreos/fleet/job"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/unit"
)

func TestSchedulerDecisions(t *testing.T) {
Expand Down Expand Up @@ -65,7 +67,22 @@ func TestSchedulerDecisions(t *testing.T) {
}
}

func buildUnitWithWeight(shares int) *job.Unit {
return &job.Unit{
Unit: unit.UnitFile{
Contents: map[string]map[string][]string{
"X-Fleet": map[string][]string{
"Weight": []string{strconv.Itoa(shares)},
},
},
},
}
}

func TestAgentStateSorting(t *testing.T) {
Unit6th := buildUnitWithWeight(10)
Unit7th := buildUnitWithWeight(20)

tests := []struct {
in []*agent.AgentState
out []*agent.AgentState
Expand Down Expand Up @@ -117,6 +134,96 @@ func TestAgentStateSorting(t *testing.T) {
},
},

// sort by weigth of jobs scheduled to agent
{
in: []*agent.AgentState{
&agent.AgentState{
MState: &machine.MachineState{ID: "A"},
Units: map[string]*job.Unit{
"1.service": &job.Unit{},
"2.service": &job.Unit{},
"3.service": &job.Unit{},
"4.service": &job.Unit{},
"5.service": &job.Unit{},
},
},
&agent.AgentState{
MState: &machine.MachineState{ID: "B"},
Units: map[string]*job.Unit{
"6.service": Unit6th,
"7.service": Unit7th,
},
},
},
out: []*agent.AgentState{
&agent.AgentState{
MState: &machine.MachineState{ID: "AAA"},
Units: map[string]*job.Unit{
"1.service": &job.Unit{},
"2.service": &job.Unit{},
"3.service": &job.Unit{},
"4.service": &job.Unit{},
"5.service": &job.Unit{},
},
},
&agent.AgentState{
MState: &machine.MachineState{ID: "BBB"},
Units: map[string]*job.Unit{
"6.service": Unit6th,
"7.service": Unit7th,
},
},
},
},

// fall back to sorting by shares when # jobs is equal
{
in: []*agent.AgentState{
&agent.AgentState{
MState: &machine.MachineState{
ID: "B",
Metadata: map[string]string{"shares": "2"},
},
Units: map[string]*job.Unit{
"1.service": &job.Unit{},
"2.service": &job.Unit{},
},
},
&agent.AgentState{
MState: &machine.MachineState{
ID: "A",
Metadata: map[string]string{"shares": "3"},
},
Units: map[string]*job.Unit{
"3.service": &job.Unit{},
"4.service": &job.Unit{},
},
},
},
out: []*agent.AgentState{
&agent.AgentState{
MState: &machine.MachineState{
ID: "A",
Metadata: map[string]string{"shares": "3"},
},
Units: map[string]*job.Unit{
"3.service": &job.Unit{},
"4.service": &job.Unit{},
},
},
&agent.AgentState{
MState: &machine.MachineState{
ID: "B",
Metadata: map[string]string{"shares": "2"},
},
Units: map[string]*job.Unit{
"1.service": &job.Unit{},
"2.service": &job.Unit{},
},
},
},
},

// fall back to sorting alphabetically by machine ID when # jobs is equal
{
in: []*agent.AgentState{
Expand Down
3 changes: 3 additions & 0 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
fleetMachineMetadata = "MachineMetadata"
// Require that the unit be scheduled on every machine in the cluster
fleetGlobal = "Global"
// Weight of the instance
fleetWeight = "Weight"

deprecatedXPrefix = "X-"
deprecatedXConditionPrefix = "X-Condition"
Expand All @@ -63,6 +65,7 @@ var validRequirements = pkg.NewUnsafeSet(
deprecatedXConditionPrefix+fleetMachineMetadata,
fleetMachineMetadata,
fleetGlobal,
fleetWeight,
)

func ParseJobState(s string) (JobState, error) {
Expand Down
16 changes: 16 additions & 0 deletions machine/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@

package machine

import "strconv"

const (
sharesMetadataName = "shares"

shortIDLen = 8

defaultShares = 1024
)

// MachineState represents a point-in-time snapshot of the
Expand All @@ -27,6 +33,16 @@ type MachineState struct {
Version string
}

func (ms MachineState) Shares() int {
if shares, ok := ms.Metadata[sharesMetadataName]; ok {
if v, err := strconv.Atoi(shares); err == nil {
return v
}
}

return defaultShares
}

func (ms MachineState) ShortID() string {
if len(ms.ID) <= shortIDLen {
return ms.ID
Expand Down
18 changes: 18 additions & 0 deletions unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"encoding/hex"
"fmt"
"io/ioutil"
"strconv"
"strings"

"github.com/coreos/go-systemd/unit"
)

const defaultWeight = 1

func NewUnitFile(raw string) (*UnitFile, error) {
reader := strings.NewReader(raw)
opts, err := unit.Deserialize(reader)
Expand Down Expand Up @@ -113,6 +116,21 @@ type UnitFile struct {
Options []*unit.UnitOption
}

// Weight returns the weight of the Job
func (u *UnitFile) Weight() int {
weightStrings := u.Contents["X-Fleet"]["Weight"]

if len(weightStrings) == 0 {
return defaultWeight
}

if w, err := strconv.Atoi(weightStrings[0]); err != nil {
return defaultWeight
} else {
return w
}
}

// Description returns the first Description option found in the [Unit] section.
// If the option is not defined, an empty string is returned.
func (u *UnitFile) Description() string {
Expand Down