Skip to content

Commit d92f02b

Browse files
authored
Merge pull request #84 from cezarsa/uncordon
Add support for uncordoning nodes
2 parents 46799e2 + 0f4b2e1 commit d92f02b

File tree

7 files changed

+616
-218
lines changed

7 files changed

+616
-218
lines changed

cmd/draino/draino.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ func main() {
9090
Aggregation: view.Count(),
9191
TagKeys: []tag.Key{kubernetes.TagResult},
9292
}
93+
nodesUncordoned = &view.View{
94+
Name: "uncordoned_nodes_total",
95+
Measure: kubernetes.MeasureNodesUncordoned,
96+
Description: "Number of nodes uncordoned.",
97+
Aggregation: view.Count(),
98+
TagKeys: []tag.Key{kubernetes.TagResult},
99+
}
93100
nodesDrained = &view.View{
94101
Name: "drained_nodes_total",
95102
Measure: kubernetes.MeasureNodesDrained,
@@ -106,7 +113,7 @@ func main() {
106113
}
107114
)
108115

109-
kingpin.FatalIfError(view.Register(nodesCordoned, nodesDrained, nodesDrainScheduled), "cannot create metrics")
116+
kingpin.FatalIfError(view.Register(nodesCordoned, nodesUncordoned, nodesDrained, nodesDrainScheduled), "cannot create metrics")
110117
p, err := prometheus.NewExporter(prometheus.Options{Namespace: kubernetes.Component})
111118
kingpin.FatalIfError(err, "cannot export metrics")
112119
view.RegisterExporter(p)
@@ -161,7 +168,8 @@ func main() {
161168
),
162169
kubernetes.NewEventRecorder(cs),
163170
kubernetes.WithLogger(log),
164-
kubernetes.WithDrainBuffer(*drainBuffer))
171+
kubernetes.WithDrainBuffer(*drainBuffer),
172+
kubernetes.WithConditionsFilter(*conditions))
165173

166174
if *dryRun {
167175
h = cache.FilteringResourceEventHandler{
@@ -170,12 +178,11 @@ func main() {
170178
&kubernetes.NoopCordonDrainer{},
171179
kubernetes.NewEventRecorder(cs),
172180
kubernetes.WithLogger(log),
173-
kubernetes.WithDrainBuffer(*drainBuffer)),
181+
kubernetes.WithDrainBuffer(*drainBuffer),
182+
kubernetes.WithConditionsFilter(*conditions)),
174183
}
175184
}
176185

177-
cf := cache.FilteringResourceEventHandler{FilterFunc: kubernetes.NewNodeConditionFilter(*conditions), Handler: h}
178-
179186
if len(*nodeLabels) > 0 {
180187
log.Debug("node labels", zap.Any("labels", nodeLabels))
181188
if *nodeLabelsExpr != "" {
@@ -193,7 +200,7 @@ func main() {
193200
log.Sugar().Fatalf("Failed to parse node label expression: %v", err)
194201
}
195202

196-
nodeLabelFilter = cache.FilteringResourceEventHandler{FilterFunc: nodeLabelFilterFunc, Handler: cf}
203+
nodeLabelFilter = cache.FilteringResourceEventHandler{FilterFunc: nodeLabelFilterFunc, Handler: h}
197204

198205
nodes := kubernetes.NewNodeWatch(cs, nodeLabelFilter)
199206

internal/kubernetes/drainer.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ const (
4343
DefaultSkipDrain = false
4444
)
4545

46+
type nodeMutatorFn func(*core.Node)
47+
4648
type errTimeout struct{}
4749

4850
func (e errTimeout) Error() string {
@@ -63,7 +65,10 @@ func IsTimeout(err error) bool {
6365
// A Cordoner cordons nodes.
6466
type Cordoner interface {
6567
// Cordon the supplied node. Marks it unschedulable for new pods.
66-
Cordon(n *core.Node) error
68+
Cordon(n *core.Node, mutators ...nodeMutatorFn) error
69+
70+
// Uncordon the supplied node. Marks it schedulable for new pods.
71+
Uncordon(n *core.Node, mutators ...nodeMutatorFn) error
6772
}
6873

6974
// A Drainer drains nodes.
@@ -83,7 +88,10 @@ type CordonDrainer interface {
8388
type NoopCordonDrainer struct{}
8489

8590
// Cordon does nothing.
86-
func (d *NoopCordonDrainer) Cordon(n *core.Node) error { return nil }
91+
func (d *NoopCordonDrainer) Cordon(n *core.Node, mutators ...nodeMutatorFn) error { return nil }
92+
93+
// Uncordon does nothing.
94+
func (d *NoopCordonDrainer) Uncordon(n *core.Node, mutators ...nodeMutatorFn) error { return nil }
8795

8896
// Drain does nothing.
8997
func (d *NoopCordonDrainer) Drain(n *core.Node) error { return nil }
@@ -177,7 +185,7 @@ func (d *APICordonDrainer) deleteTimeout() time.Duration {
177185
}
178186

179187
// Cordon the supplied node. Marks it unschedulable for new pods.
180-
func (d *APICordonDrainer) Cordon(n *core.Node) error {
188+
func (d *APICordonDrainer) Cordon(n *core.Node, mutators ...nodeMutatorFn) error {
181189
fresh, err := d.c.CoreV1().Nodes().Get(n.GetName(), meta.GetOptions{})
182190
if err != nil {
183191
return errors.Wrapf(err, "cannot get node %s", n.GetName())
@@ -186,12 +194,34 @@ func (d *APICordonDrainer) Cordon(n *core.Node) error {
186194
return nil
187195
}
188196
fresh.Spec.Unschedulable = true
197+
for _, m := range mutators {
198+
m(fresh)
199+
}
189200
if _, err := d.c.CoreV1().Nodes().Update(fresh); err != nil {
190201
return errors.Wrapf(err, "cannot cordon node %s", fresh.GetName())
191202
}
192203
return nil
193204
}
194205

206+
// Uncordon the supplied node. Marks it schedulable for new pods.
207+
func (d *APICordonDrainer) Uncordon(n *core.Node, mutators ...nodeMutatorFn) error {
208+
fresh, err := d.c.CoreV1().Nodes().Get(n.GetName(), meta.GetOptions{})
209+
if err != nil {
210+
return errors.Wrapf(err, "cannot get node %s", n.GetName())
211+
}
212+
if !fresh.Spec.Unschedulable {
213+
return nil
214+
}
215+
fresh.Spec.Unschedulable = false
216+
for _, m := range mutators {
217+
m(fresh)
218+
}
219+
if _, err := d.c.CoreV1().Nodes().Update(fresh); err != nil {
220+
return errors.Wrapf(err, "cannot uncordon node %s", fresh.GetName())
221+
}
222+
return nil
223+
}
224+
195225
// MarkDrain set a condition on the node to mark that that drain is scheduled.
196226
func (d *APICordonDrainer) MarkDrain(n *core.Node, when, finish time.Time, failed bool) error {
197227
nodeName := n.Name

internal/kubernetes/drainer_test.go

Lines changed: 139 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and limitations under the License.
1717
package kubernetes
1818

1919
import (
20+
"reflect"
2021
"testing"
2122
"time"
2223

@@ -79,84 +80,181 @@ func TestCordon(t *testing.T) {
7980
cases := []struct {
8081
name string
8182
node *core.Node
83+
mutators []nodeMutatorFn
84+
expected *core.Node
8285
reactions []reactor
8386
}{
8487
{
8588
name: "CordonSchedulableNode",
8689
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
87-
reactions: []reactor{
88-
reactor{
89-
verb: "get",
90-
resource: "nodes",
91-
ret: &core.Node{
92-
ObjectMeta: meta.ObjectMeta{Name: nodeName},
93-
Spec: core.NodeSpec{Unschedulable: false},
94-
},
95-
},
96-
reactor{
97-
verb: "update",
98-
resource: "nodes",
99-
ret: &core.Node{
100-
ObjectMeta: meta.ObjectMeta{Name: nodeName},
101-
Spec: core.NodeSpec{Unschedulable: true},
102-
},
103-
},
90+
expected: &core.Node{
91+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
92+
Spec: core.NodeSpec{Unschedulable: true},
10493
},
10594
},
10695
{
10796
name: "CordonUnschedulableNode",
108-
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
109-
reactions: []reactor{
110-
reactor{
111-
verb: "get",
112-
resource: "nodes",
113-
ret: &core.Node{
114-
ObjectMeta: meta.ObjectMeta{Name: nodeName},
115-
Spec: core.NodeSpec{Unschedulable: true},
116-
},
117-
},
97+
node: &core.Node{
98+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
99+
Spec: core.NodeSpec{Unschedulable: true},
100+
},
101+
expected: &core.Node{
102+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
103+
Spec: core.NodeSpec{Unschedulable: true},
118104
},
119105
},
120106
{
121107
name: "CordonNonExistentNode",
122108
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
123109
reactions: []reactor{
124-
reactor{verb: "get", resource: "nodes", err: errors.New("nope")},
110+
{verb: "get", resource: "nodes", err: errors.New("nope")},
125111
},
126112
},
127113
{
128114
name: "ErrorCordoningSchedulableNode",
129115
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
130116
reactions: []reactor{
131-
reactor{
132-
verb: "get",
133-
resource: "nodes",
134-
ret: &core.Node{
135-
ObjectMeta: meta.ObjectMeta{Name: nodeName},
136-
Spec: core.NodeSpec{Unschedulable: false},
137-
},
138-
},
139-
reactor{verb: "update", resource: "nodes", err: errors.New("nope")},
117+
{verb: "update", resource: "nodes", err: errors.New("nope")},
118+
},
119+
},
120+
{
121+
name: "CordonSchedulableNodeWithMutator",
122+
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
123+
mutators: []nodeMutatorFn{func(n *core.Node) {
124+
n.Annotations = map[string]string{"foo": "1"}
125+
}},
126+
expected: &core.Node{
127+
ObjectMeta: meta.ObjectMeta{Name: nodeName, Annotations: map[string]string{"foo": "1"}},
128+
Spec: core.NodeSpec{Unschedulable: true},
129+
},
130+
},
131+
{
132+
name: "CordonUnschedulableNodeWithMutator",
133+
node: &core.Node{
134+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
135+
Spec: core.NodeSpec{Unschedulable: true},
136+
},
137+
mutators: []nodeMutatorFn{func(n *core.Node) {
138+
n.Annotations = map[string]string{"foo": "1"}
139+
}},
140+
expected: &core.Node{
141+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
142+
Spec: core.NodeSpec{Unschedulable: true},
140143
},
141144
},
142145
}
143146

144147
for _, tc := range cases {
145148
t.Run(tc.name, func(t *testing.T) {
146-
c := &fake.Clientset{}
149+
c := fake.NewSimpleClientset(tc.node)
147150
for _, r := range tc.reactions {
148-
c.AddReactor(r.verb, r.resource, r.Fn())
151+
c.PrependReactor(r.verb, r.resource, r.Fn())
149152
}
150-
151153
d := NewAPICordonDrainer(c)
152-
if err := d.Cordon(tc.node); err != nil {
154+
if err := d.Cordon(tc.node, tc.mutators...); err != nil {
153155
for _, r := range tc.reactions {
154156
if errors.Cause(err) == r.err {
155157
return
156158
}
157159
}
158160
t.Errorf("d.Cordon(%v): %v", tc.node.Name, err)
159161
}
162+
{
163+
n, err := c.CoreV1().Nodes().Get(tc.node.GetName(), meta.GetOptions{})
164+
if err != nil {
165+
t.Errorf("node.Get(%v): %v", tc.node.Name, err)
166+
}
167+
if !reflect.DeepEqual(tc.expected, n) {
168+
t.Errorf("node.Get(%v): want %#v, got %#v", tc.node.Name, tc.expected, n)
169+
}
170+
}
171+
})
172+
}
173+
}
174+
175+
func TestUncordon(t *testing.T) {
176+
cases := []struct {
177+
name string
178+
node *core.Node
179+
mutators []nodeMutatorFn
180+
expected *core.Node
181+
reactions []reactor
182+
}{
183+
{
184+
name: "UncordonSchedulableNode",
185+
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
186+
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
187+
},
188+
{
189+
name: "UncordonUnschedulableNode",
190+
node: &core.Node{
191+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
192+
Spec: core.NodeSpec{Unschedulable: true},
193+
},
194+
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
195+
},
196+
{
197+
name: "UncordonNonExistentNode",
198+
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
199+
reactions: []reactor{
200+
{verb: "get", resource: "nodes", err: errors.New("nope")},
201+
},
202+
},
203+
{
204+
name: "ErrorUncordoningUnschedulableNode",
205+
node: &core.Node{
206+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
207+
Spec: core.NodeSpec{Unschedulable: true},
208+
},
209+
reactions: []reactor{
210+
{verb: "update", resource: "nodes", err: errors.New("nope")},
211+
},
212+
},
213+
{
214+
name: "UncordonSchedulableNodeWithMutator",
215+
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
216+
mutators: []nodeMutatorFn{func(n *core.Node) {
217+
n.Annotations = map[string]string{"foo": "1"}
218+
}},
219+
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
220+
},
221+
{
222+
name: "UncordonUnschedulableNodeWithMutator",
223+
node: &core.Node{
224+
ObjectMeta: meta.ObjectMeta{Name: nodeName},
225+
Spec: core.NodeSpec{Unschedulable: true},
226+
},
227+
mutators: []nodeMutatorFn{func(n *core.Node) {
228+
n.Annotations = map[string]string{"foo": "1"}
229+
}},
230+
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName, Annotations: map[string]string{"foo": "1"}}},
231+
},
232+
}
233+
234+
for _, tc := range cases {
235+
t.Run(tc.name, func(t *testing.T) {
236+
c := fake.NewSimpleClientset(tc.node)
237+
for _, r := range tc.reactions {
238+
c.PrependReactor(r.verb, r.resource, r.Fn())
239+
}
240+
d := NewAPICordonDrainer(c)
241+
if err := d.Uncordon(tc.node, tc.mutators...); err != nil {
242+
for _, r := range tc.reactions {
243+
if errors.Cause(err) == r.err {
244+
return
245+
}
246+
}
247+
t.Errorf("d.Uncordon(%v): %v", tc.node.Name, err)
248+
}
249+
{
250+
n, err := c.CoreV1().Nodes().Get(tc.node.GetName(), meta.GetOptions{})
251+
if err != nil {
252+
t.Errorf("node.Get(%v): %v", tc.node.Name, err)
253+
}
254+
if !reflect.DeepEqual(tc.expected, n) {
255+
t.Errorf("node.Get(%v): want %#v, got %#v", tc.node.Name, tc.expected, n)
256+
}
257+
}
160258
})
161259
}
162260
}

0 commit comments

Comments
 (0)