Skip to content

Commit

Permalink
add support for deployment restarts in Configure task (#93)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh authored Aug 12, 2024
1 parent 7b315cd commit 1f5a1fc
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 10 deletions.
113 changes: 105 additions & 8 deletions pkg/engine/configure_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"os/exec"
"strings"
"sync"
"time"

Expand All @@ -30,6 +31,7 @@ import (
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
log "k8s.io/klog/v2"

Expand All @@ -44,11 +46,13 @@ type ConfigureTask struct {
}

type configureTaskParams struct {
Nodes []virtualNode `yaml:"nodes"`
Namespaces []namespace `yaml:"namespaces"`
ConfigMaps []configmap `yaml:"configmaps"`
PriorityClasses []priorityClass `yaml:"priorityClasses"`
Timeout time.Duration `yaml:"timeout"`
Nodes []virtualNode `yaml:"nodes"`
Namespaces []namespace `yaml:"namespaces"`
ConfigMaps []configmap `yaml:"configmaps"`
PriorityClasses []priorityClass `yaml:"priorityClasses"`
DeploymentRestarts []deploymentRestart `yaml:"deploymentRestarts"`

Timeout time.Duration `yaml:"timeout"`
}

type virtualNode struct {
Expand Down Expand Up @@ -77,6 +81,12 @@ type priorityClass struct {
Op string `yaml:"op"`
}

type deploymentRestart struct {
Name string `yaml:"name,omitempty"`
Namespace string `yaml:"namespace"`
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"`
}

func newConfigureTask(client *kubernetes.Clientset, cfg *config.Task) (*ConfigureTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: Kubernetes client is not set", cfg.Type, cfg.ID)
Expand Down Expand Up @@ -137,6 +147,23 @@ func (task *ConfigureTask) validate(params map[string]interface{}) error {
return fmt.Errorf("%s: invalid PriorityClass operation %s; supported: %s, %s", task.ID(), pc.Op, OpCreate, OpDelete)
}
}

for _, dr := range task.DeploymentRestarts {
if len(dr.Namespace) == 0 {
return fmt.Errorf("%s: must provide namespace when restarting deployment", task.ID())
}
if (len(dr.Name) != 0 && len(dr.Labels) != 0) || (len(dr.Name) == 0 && len(dr.Labels) == 0) {
return fmt.Errorf("%s: must provide either name or labels when restarting deployment", task.ID())
}
if len(dr.Name) == 0 {
for key := range dr.Labels {
if len(key) == 0 {
return fmt.Errorf("%s: must provide non-empty label name when restarting deployment", task.ID())
}
}
}
}

if task.Timeout == 0 {
return fmt.Errorf("%s: missing parameter 'timeout'", task.ID())
}
Expand All @@ -145,7 +172,7 @@ func (task *ConfigureTask) validate(params map[string]interface{}) error {
}

// Exec implements Runnable interface
func (task *ConfigureTask) Exec(ctx context.Context) (err error) {
func (task *ConfigureTask) Exec(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, task.Timeout)
defer cancel()

Expand Down Expand Up @@ -178,14 +205,19 @@ func (task *ConfigureTask) Exec(ctx context.Context) (err error) {
errs <- task.updateVirtualNodes(ctx)
}()

var err error
for e := range errs {
if e != nil {
log.Errorf("configuration error: %v", err)
log.Errorf("configuration error: %v", e)
err = e
}
}

return
if err != nil {
return err
}

return task.restartDeployments(ctx)
}

func (task *ConfigureTask) updateNamespaces(ctx context.Context) error {
Expand Down Expand Up @@ -305,6 +337,71 @@ func (task *ConfigureTask) updateConfigmaps(ctx context.Context) error {
return nil
}

func (task *ConfigureTask) restartDeployments(ctx context.Context) error {
for _, dr := range task.DeploymentRestarts {
dClient := task.client.AppsV1().Deployments(dr.Namespace)

dName := dr.Name
if len(dName) == 0 {
labels := make([]string, 0, len(dr.Labels))
for key, val := range dr.Labels {
labels = append(labels, key+"="+val)
}
lbl := strings.Join(labels, ",")

list, err := dClient.List(ctx, metav1.ListOptions{LabelSelector: lbl})
if err != nil {
log.InfoS("Warning: skipping restart of deployment", "labels", lbl, "error", err.Error())
return nil
}

if len(list.Items) == 0 {
log.InfoS("Warning: no deployment to restart", "labels", lbl)
return nil
}

if len(list.Items) != 1 {
return fmt.Errorf("expected 1 deployment with labels %s, not %d", lbl, len(list.Items))
}

dName = list.Items[0].Name
}
log.Infof("Restarting deployment %s", dName)

update := fmt.Sprintf(`{"spec": {"template": {"metadata": {"annotations": {"kubectl.kubernetes.io/restartedAt": "%s"}}}}}`,
time.Now().Format("2006-01-02T15:04:05-07:00"))

_, err := dClient.Patch(ctx, dName, k8stypes.StrategicMergePatchType, []byte(update), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("failed to update deployment %s: %s", dName, err.Error())
}

delay := 5 * time.Second
timer := time.NewTimer(delay)
defer timer.Stop()
for {
select {
case <-timer.C:
d, err := dClient.Get(ctx, dName, metav1.GetOptions{})
if err != nil {
log.Errorf("failed to get status for deployment %s : %v", dName, err)
} else if d.Status.UnavailableReplicas != 0 {
log.V(4).Infof("Restarting deployment %s: %d unavailable replicas", dName, d.Status.UnavailableReplicas)
} else {
log.Infof("Restarted deployment %s", dName)
return nil
}
timer.Reset(delay)

case <-ctx.Done():
return ctx.Err()
}
}
}

return nil
}

func (task *ConfigureTask) updateVirtualNodes(ctx context.Context) error {
if len(task.Nodes) == 0 {
return nil
Expand Down
65 changes: 63 additions & 2 deletions pkg/engine/configure_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,54 @@ func TestNewConfigureTask(t *testing.T) {
err: "Configure/configure: must provide value when creating PriorityClass",
},
{
name: "Case 7: Valid parameters with default",
name: "Case 7a: Missing deployment namespace",
simClients: true,
params: map[string]interface{}{
"timeout": "1m",
"namespaces": []interface{}{map[string]interface{}{"name": "ns", "op": "create"}},
"deploymentRestarts": []interface{}{map[string]interface{}{"name": "deploy"}},
},
err: "Configure/configure: must provide namespace when restarting deployment",
},
{
name: "Case 7b: Missing deployment name and labels",
simClients: true,
params: map[string]interface{}{
"timeout": "1m",
"namespaces": []interface{}{map[string]interface{}{"name": "ns", "op": "create"}},
"deploymentRestarts": []interface{}{map[string]interface{}{"namespace": "name"}},
},
err: "Configure/configure: must provide either name or labels when restarting deployment",
},
{
name: "Case 7c: Both deployment name and labels are present",
simClients: true,
params: map[string]interface{}{
"timeout": "1m",
"namespaces": []interface{}{map[string]interface{}{"name": "ns", "op": "create"}},
"deploymentRestarts": []interface{}{map[string]interface{}{
"namespace": "name",
"name": "deploy",
"labels": map[string]interface{}{"key": "value"},
}},
},
err: "Configure/configure: must provide either name or labels when restarting deployment",
},
{
name: "Case 7d: Missing deployment label",
simClients: true,
params: map[string]interface{}{
"timeout": "1m",
"namespaces": []interface{}{map[string]interface{}{"name": "ns", "op": "create"}},
"deploymentRestarts": []interface{}{map[string]interface{}{
"namespace": "name",
"labels": map[string]interface{}{"": "value"},
}},
},
err: "Configure/configure: must provide non-empty label name when restarting deployment",
},
{
name: "Case 8: Valid parameters with default",
simClients: true,
params: map[string]interface{}{"timeout": "1m"},
task: &ConfigureTask{
Expand All @@ -118,7 +165,7 @@ func TestNewConfigureTask(t *testing.T) {
},
},
{
name: "Case 8: Valid parameters without default",
name: "Case 9: Valid parameters without default",
simClients: true,
params: map[string]interface{}{
"timeout": "1m",
Expand All @@ -142,6 +189,10 @@ func TestNewConfigureTask(t *testing.T) {
map[string]interface{}{"name": "high-priority", "op": "create", "value": 90},
map[string]interface{}{"name": "low-priority", "op": "delete"},
},
"deploymentRestarts": []interface{}{
map[string]interface{}{"namespace": "ns1", "name": "deploy1"},
map[string]interface{}{"namespace": "ns2", "labels": map[string]interface{}{"key": "value"}},
},
},
task: &ConfigureTask{
BaseTask: BaseTask{
Expand Down Expand Up @@ -189,6 +240,16 @@ func TestNewConfigureTask(t *testing.T) {
Op: OpDelete,
},
},
DeploymentRestarts: []deploymentRestart{
{
Namespace: "ns1",
Name: "deploy1",
},
{
Namespace: "ns2",
Labels: map[string]string{"key": "value"},
},
},
},
client: testK8sClient,
},
Expand Down

0 comments on commit 1f5a1fc

Please sign in to comment.