diff --git a/erda.yml b/erda.yml index 6769384ecd8c..b0971faaa4a1 100644 --- a/erda.yml +++ b/erda.yml @@ -548,6 +548,10 @@ services: GIT_INNER_USER_NAME: "18000000000" GIT_INNER_USER_PASSWORD: "123456" PIPELINE_STORAGE_URL: "file:///devops/storage" + POD_IP: + valueFrom: + fieldRef: + fieldPath: status.podIP resources: cpu: ${request_cpu:1} max_cpu: 1 diff --git a/go.mod b/go.mod index 02edd011291b..d4f42c28f38f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Chronokeeper/anyxml v0.0.0-20160530174208-54457d8e98c6 // indirect github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect github.com/CloudyKit/jet v2.1.2+incompatible // indirect + github.com/googlecloudplatform/flink-operator v0.0.0-00010101000000-000000000000 github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20201215015655-2e8b733f5ad0 github.com/Masterminds/semver v1.5.0 github.com/agrison/go-tablib v0.0.0-20160310143025-4930582c22ee // indirect @@ -63,7 +64,6 @@ require ( github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8 // indirect github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/google/uuid v1.2.0 - github.com/googlecloudplatform/flink-operator v0.0.0-20200909223554-f302312417ee github.com/gorilla/mux v1.8.0 github.com/gorilla/schema v1.1.0 github.com/gorilla/websocket v1.4.2 @@ -167,9 +167,9 @@ require ( ) replace ( + github.com/googlecloudplatform/flink-operator => github.com/googlecloudplatform/flink-on-k8s-operator v0.0.0-20200909223554-f302312417ee github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5 github.com/google/gnostic => github.com/googleapis/gnostic v0.4.0 - github.com/googlecloudplatform/flink-operator => github.com/googlecloudplatform/flink-on-k8s-operator v0.0.0-20200909223554-f302312417ee github.com/influxdata/influxql => github.com/erda-project/influxql v1.1.0-ex github.com/olivere/elastic v6.2.35+incompatible => github.com/erda-project/elastic v0.0.1-ex github.com/rancher/remotedialer => github.com/erda-project/remotedialer v0.2.6-0.20210618084817-52c879aadbcb diff --git a/go.sum b/go.sum index e0c07fc5247c..db18038cce21 100644 --- a/go.sum +++ b/go.sum @@ -228,6 +228,8 @@ github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgk github.com/cheggaaa/pb/v3 v3.0.1/go.mod h1:SqqeMF/pMOIu3xgGoxtPYhMNQP258xE4x/XRTYua+KU= github.com/cheggaaa/pb/v3 v3.0.4 h1:QZEPYOj2ix6d5oEg63fbHmpolrnNiwjUsk+h74Yt4bM= github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw= +github.com/chengjoey/flink-on-k8s-operator v0.0.1 h1:a4zrgATUtbQXFrfz0OAH1C9crs4fojzp2k25oNlgfl0= +github.com/chengjoey/flink-on-k8s-operator v0.0.1/go.mod h1:iltjjRPXWoa06VGEyxokBud/9kONhWOyzKm037QEwRQ= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= diff --git a/modules/pipeline/conf/conf.go b/modules/pipeline/conf/conf.go index ec5eec872b8c..fbbf0eadd524 100644 --- a/modules/pipeline/conf/conf.go +++ b/modules/pipeline/conf/conf.go @@ -53,6 +53,9 @@ type Conf struct { CollectorAddr string `env:"COLLECTOR_ADDR" required:"false"` ClusterManagerAddr string `env:"CLUSTER_MANAGER_ADDR" required:"false"` + // pod ip + PodIP string `env:"POD_IP"` + // public url GittarPublicURL string `env:"GITTAR_PUBLIC_URL" required:"true"` OpenAPIPublicURL string `env:"OPENAPI_PUBLIC_URL" required:"true"` @@ -315,3 +318,8 @@ func CronCompensateConcurrentNumber() int64 { func CronFailureCreateIntervalCompensateTimeSecond() int64 { return cfg.CronFailureCreateIntervalCompensateTimeSecond } + +// self pod ip +func PodIP() string { + return cfg.PodIP +} diff --git a/modules/pipeline/endpoints/clusterinfo.go b/modules/pipeline/endpoints/clusterinfo.go index f77a0f3f14e6..64570a9f93a0 100644 --- a/modules/pipeline/endpoints/clusterinfo.go +++ b/modules/pipeline/endpoints/clusterinfo.go @@ -28,14 +28,14 @@ import ( func (e *Endpoints) clusterHook(ctx context.Context, r *http.Request, vars map[string]string) (httpserver.Responser, error) { req := apistructs.ClusterEvent{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - errStr := fmt.Sprintf("decode clusterhook request fail: %v", err) + errStr := fmt.Sprintf("failed to decode clusterhook request err: %v", err) logrus.Error(errStr) - return httpserver.HTTPResponse{Status: http.StatusBadRequest, Content: errStr}, nil + return httpserver.ErrResp(http.StatusBadRequest, "", errStr) } if err := e.pipelineSvc.ClusterHook(req); err != nil { errStr := fmt.Sprintf("failed to handle cluster event: %v", err) logrus.Error(errStr) - return httpserver.HTTPResponse{Status: http.StatusBadRequest, Content: errStr}, nil + return httpserver.ErrResp(http.StatusBadRequest, "", errStr) } return httpserver.HTTPResponse{Status: http.StatusOK}, nil } diff --git a/modules/pipeline/endpoints/endpoints.go b/modules/pipeline/endpoints/endpoints.go index b9e149b21f55..46f7f08f1b7c 100644 --- a/modules/pipeline/endpoints/endpoints.go +++ b/modules/pipeline/endpoints/endpoints.go @@ -37,6 +37,10 @@ import ( "github.com/erda-project/erda/pkg/http/httpserver" ) +const ( + ClusterHookApiPath = "/api/pipeline-clusters/hook" +) + // Endpoints 定义 endpoint 方法 type Endpoints struct { appSvc *appsvc.AppSvc @@ -247,6 +251,6 @@ func (e *Endpoints) Routes() []httpserver.Endpoint { {Path: "/api/pipeline-reportsets", Method: http.MethodGet, Handler: e.pagingPipelineReportSets}, // cluster info - {Path: "/clusterhook", Method: http.MethodPost, Handler: e.clusterHook}, + {Path: ClusterHookApiPath, Method: http.MethodPost, Handler: e.clusterHook}, } } diff --git a/modules/pipeline/initialize.go b/modules/pipeline/initialize.go index db8dd107f907..6cbc1fe64ae4 100644 --- a/modules/pipeline/initialize.go +++ b/modules/pipeline/initialize.go @@ -48,7 +48,6 @@ import ( "github.com/erda-project/erda/modules/pipeline/services/reportsvc" "github.com/erda-project/erda/modules/pipeline/services/snippetsvc" "github.com/erda-project/erda/modules/pkg/websocket" - "github.com/erda-project/erda/pkg/discover" "github.com/erda-project/erda/pkg/http/httpserver" "github.com/erda-project/erda/pkg/jsonstore" "github.com/erda-project/erda/pkg/jsonstore/etcd" @@ -234,7 +233,7 @@ func registerClusterHook(bdl *bundle.Bundle) { ev := apistructs.CreateHookRequest{ Name: "pipeline_watch_cluster_changed", Events: []string{bundle.ClusterEvent}, - URL: strutil.Concat("http://", discover.Pipeline(), "/clusterhook"), + URL: strutil.Concat("http://", conf.PodIP(), endpoints.ClusterHookApiPath), Active: true, HookLocation: apistructs.HookLocation{ Org: "-1", diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/manager.go b/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/manager.go index 3b3e6a466a8e..2e83a19f3f7f 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/manager.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/manager.go @@ -53,7 +53,7 @@ func (m *Manager) Initialize(cfgs []apistructs.ClusterInfo) error { if cfgs[i].Type != apistructs.K8S { continue } - if err := m.addExecutor(cfgs[i]); err != nil { + if err := m.updateExecutor(cfgs[i]); err != nil { continue } } @@ -82,102 +82,6 @@ func (m *Manager) Get(name types.Name) (types.TaskExecutor, error) { return e, nil } -func (m *Manager) addExecutor(cluster apistructs.ClusterInfo) error { - m.Lock() - defer m.Unlock() - - switch cluster.Type { - case apistructs.K8S: - k8sjobCreate, ok := m.factory[k8sjob.Kind] - if ok { - name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sjob.Kind)) - if _, exist := m.executors[name]; exist { - return errors.Errorf("task executor name: %s already existed", name) - } - k8sjobExecutor, err := k8sjobCreate(name, cluster.Name, cluster) - if err != nil { - logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", k8sjob.Kind, name, err) - return err - } - m.executors[name] = k8sjobExecutor - logrus.Infof("=> kind [%s], name [%s], created", k8sjob.Kind, name) - } - - k8sflinkCreate, ok := m.factory[k8sflink.Kind] - if ok { - name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sflink.Kind)) - if _, exist := m.executors[name]; exist { - return errors.Errorf("task executor name: %s already existed", name) - } - k8sflinkExecutor, err := k8sflinkCreate(name, cluster.Name, cluster) - if err != nil { - logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", k8sflink.Kind, name, err) - return err - } - m.executors[name] = k8sflinkExecutor - } - - k8ssparkCreate, ok := m.factory[k8sspark.Kind] - if ok { - name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sspark.Kind)) - if _, exist := m.executors[name]; exist { - return errors.Errorf("task executor name: %s already existed", name) - } - k8ssparkExecutor, err := k8ssparkCreate(name, cluster.Name, cluster) - if err != nil { - logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", k8sspark.Kind, name, err) - return err - } - m.executors[name] = k8ssparkExecutor - } - //case apistructs.ClusterTypeDcos: - // flinkCreate, ok := m.factory[flink.Kind] - // if ok { - // name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, flink.Kind)) - // if _, exist := m.executors[name]; exist { - // return errors.Errorf("task executor name: %s already existed", name) - // } - // flinkExecutor, err := flinkCreate(name, cluster.Name, cluster) - // if err != nil { - // logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", flink.Kind, name, err) - // return err - // } - // m.executors[name] = flinkExecutor - // } - // - // sparkCreate, ok := m.factory[spark.Kind] - // if ok { - // name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, flink.Kind)) - // if _, exist := m.executors[name]; exist { - // return errors.Errorf("task executor name: %s already existed", name) - // } - // sparkExecutor, err := sparkCreate(name, cluster.Name, cluster) - // if err != nil { - // logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", spark.Kind, name, err) - // return err - // } - // m.executors[name] = sparkExecutor - // } - // - // metronomeCreate, ok := m.factory[flink.Kind] - // if ok { - // name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, metronome.Kind)) - // if _, exist := m.executors[name]; exist { - // return errors.Errorf("task executor name: %s already existed", name) - // } - // metronomeExecutor, err := metronomeCreate(name, cluster.Name, cluster) - // if err != nil { - // logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", metronome.Kind, name, err) - // return err - // } - // m.executors[name] = metronomeExecutor - // } - default: - - } - return nil -} - func (m *Manager) deleteExecutor(cluster apistructs.ClusterInfo) { m.Lock() defer m.Unlock() @@ -266,7 +170,7 @@ func (m *Manager) listenClusterEventSync(ctx context.Context, eventChan <-chan a case event := <-eventChan: switch event.Action { case apistructs.ClusterActionCreate: - err = m.addExecutor(event.Content) + err = m.updateExecutor(event.Content) if err != nil { logrus.Errorf("failed to add task executor, err: %v", err) }