Skip to content

Commit

Permalink
register cluster hook
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Jul 4, 2021
1 parent 7a0e07a commit 49f83a6
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
6 changes: 0 additions & 6 deletions apistructs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ const (
ClusterActionDelete = "delete"
)

const (
ClusterTypeDcos = "dcos"
ClusterTypeK8S = "k8s"
ClusterTypeEdas = "edas"
)

// token / client-cert&client-key / proxy(dialer)
const (
ManageToken = "token"
Expand Down
22 changes: 22 additions & 0 deletions modules/pipeline/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ import (
"github.com/erda-project/erda/modules/pipeline/services/reportsvc"
"github.com/erda-project/erda/modules/pipeline/services/snippetsvc"
"github.com/erda-project/erda/modules/pkg/websocket"
"github.com/erda-project/erda/pkg/discover"
"github.com/erda-project/erda/pkg/http/httpserver"
"github.com/erda-project/erda/pkg/jsonstore"
"github.com/erda-project/erda/pkg/jsonstore/etcd"
"github.com/erda-project/erda/pkg/loop"
"github.com/erda-project/erda/pkg/pipeline_network_hook_client"
"github.com/erda-project/erda/pkg/pipeline_snippet_client"
"github.com/erda-project/erda/pkg/strutil"
// "terminus.io/dice/telemetry/promxp"
)

Expand Down Expand Up @@ -200,6 +202,9 @@ func do() (*httpserver.Server, error) {
return nil, err
}

// register cluster hook
registerClusterHook(bdl)

return server, nil
}

Expand All @@ -225,6 +230,23 @@ func registerSnippetClient(dbclient *dbclient.Client) error {
return nil
}

func registerClusterHook(bdl *bundle.Bundle) {
ev := apistructs.CreateHookRequest{
Name: "pipeline_watch_cluster_changed",
Events: []string{bundle.ClusterEvent},
URL: strutil.Concat("http://", discover.Pipeline(), "/clusterhook"),
Active: true,
HookLocation: apistructs.HookLocation{
Org: "-1",
Project: "-1",
Application: "-1",
},
}
if err := bdl.CreateWebhook(ev); err != nil {
logrus.Warnf("failed to register watch cluster changed event, %v", err)
}
}

func doCrondAbout(crondSvc *crondsvc.CrondSvc, pipelineSvc *pipelinesvc.PipelineSvc) error {
// 加载定时配置
logs, err := crondSvc.ReloadCrond(pipelineSvc.RunCronPipelineFunc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ import (
"github.com/erda-project/erda/modules/pipeline/pkg/clusterinfo"
)

const (
CLUSTERTYPEK8S = "k8s"
)

// Manager for scheduler task executor
type Manager struct {
sync.RWMutex
Expand All @@ -54,7 +50,7 @@ func (m *Manager) Initialize(cfgs []apistructs.ClusterInfo) error {
logrus.Infof("pipeline scheduler task executor Inititalize ...")

for i := range cfgs {
if cfgs[i].Type != apistructs.ClusterTypeK8S {
if cfgs[i].Type != apistructs.K8S {
continue
}
if err := m.addExecutor(cfgs[i]); err != nil {
Expand Down Expand Up @@ -91,7 +87,7 @@ func (m *Manager) addExecutor(cluster apistructs.ClusterInfo) error {
defer m.Unlock()

switch cluster.Type {
case CLUSTERTYPEK8S:
case apistructs.K8S:
k8sjobCreate, ok := m.factory[k8sjob.Kind]
if ok {
name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sjob.Kind))
Expand Down Expand Up @@ -187,7 +183,7 @@ func (m *Manager) deleteExecutor(cluster apistructs.ClusterInfo) {
defer m.Unlock()

switch cluster.Type {
case CLUSTERTYPEK8S:
case apistructs.K8S:
name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sjob.Kind))
if _, exist := m.executors[name]; exist {
delete(m.executors, name)
Expand All @@ -212,7 +208,7 @@ func (m *Manager) updateExecutor(cluster apistructs.ClusterInfo) error {
defer m.Unlock()

switch cluster.Type {
case CLUSTERTYPEK8S:
case apistructs.K8S:
k8sjobCreate, ok := m.factory[k8sjob.Kind]
if ok {
name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sjob.Kind))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ func (k *K8sJob) Remove(ctx context.Context, task *spec.PipelineTask) (data inte
func (k *K8sJob) BatchDelete(ctx context.Context, tasks []*spec.PipelineTask) (data interface{}, err error) {
for _, task := range tasks {
if len(task.Extra.UUID) <= 0 {
continue
}
continue
}
_, err = k.Remove(ctx, task)
if err != nil {
return nil, err
}
return nil, err
}
}
return nil, nil
}
Expand Down
6 changes: 3 additions & 3 deletions modules/pipeline/services/pipelinesvc/clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func (s *PipelineSvc) retryQueryClusterInfo(clusterName string, pipelineID uint6

// ClusterHook listen and dispatch cluster event from eventbox
func (s *PipelineSvc) ClusterHook(clusterEvent apistructs.ClusterEvent) error {
if !strutil.Equal(clusterEvent.Content.Type, apistructs.ClusterTypeK8S, true) &&
!strutil.Equal(clusterEvent.Content.Type, apistructs.ClusterTypeEdas, true) &&
!strutil.Equal(clusterEvent.Content.Type, apistructs.ClusterTypeDcos, true) {
if !strutil.Equal(clusterEvent.Content.Type, apistructs.K8S, true) &&
!strutil.Equal(clusterEvent.Content.Type, apistructs.EDAS, true) &&
!strutil.Equal(clusterEvent.Content.Type, apistructs.DCOS, true) {
return errors.Errorf("invalid cluster event type: %s", clusterEvent.Content.Type)
}

Expand Down

0 comments on commit 49f83a6

Please sign in to comment.