Skip to content

Commit

Permalink
replace mod pkg github.com/googlecloudplatform to github.com/GoogleCl…
Browse files Browse the repository at this point in the history
…oudPlatform
  • Loading branch information
chengjoey committed Jul 4, 2021
1 parent 49f83a6 commit 5fba941
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 106 deletions.
4 changes: 4 additions & 0 deletions erda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ services:
GIT_INNER_USER_NAME: "18000000000"
GIT_INNER_USER_PASSWORD: "123456"
PIPELINE_STORAGE_URL: "file:///devops/storage"
POD_IP:
valueFrom:
fieldRef:
fieldPath: status.podIP
resources:
cpu: ${request_cpu:1}
max_cpu: 1
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/Chronokeeper/anyxml v0.0.0-20160530174208-54457d8e98c6 // indirect
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect
github.com/CloudyKit/jet v2.1.2+incompatible // indirect
github.com/googlecloudplatform/flink-operator v0.0.0-00010101000000-000000000000
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20201215015655-2e8b733f5ad0
github.com/Masterminds/semver v1.5.0
github.com/agrison/go-tablib v0.0.0-20160310143025-4930582c22ee // indirect
Expand Down Expand Up @@ -63,7 +64,6 @@ require (
github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8 // indirect
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/google/uuid v1.2.0
github.com/googlecloudplatform/flink-operator v0.0.0-20200909223554-f302312417ee
github.com/gorilla/mux v1.8.0
github.com/gorilla/schema v1.1.0
github.com/gorilla/websocket v1.4.2
Expand Down Expand Up @@ -167,9 +167,9 @@ require (
)

replace (
github.com/googlecloudplatform/flink-operator => github.com/googlecloudplatform/flink-on-k8s-operator v0.0.0-20200909223554-f302312417ee
github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5
github.com/google/gnostic => github.com/googleapis/gnostic v0.4.0
github.com/googlecloudplatform/flink-operator => github.com/googlecloudplatform/flink-on-k8s-operator v0.0.0-20200909223554-f302312417ee
github.com/influxdata/influxql => github.com/erda-project/influxql v1.1.0-ex
github.com/olivere/elastic v6.2.35+incompatible => github.com/erda-project/elastic v0.0.1-ex
github.com/rancher/remotedialer => github.com/erda-project/remotedialer v0.2.6-0.20210618084817-52c879aadbcb
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgk
github.com/cheggaaa/pb/v3 v3.0.1/go.mod h1:SqqeMF/pMOIu3xgGoxtPYhMNQP258xE4x/XRTYua+KU=
github.com/cheggaaa/pb/v3 v3.0.4 h1:QZEPYOj2ix6d5oEg63fbHmpolrnNiwjUsk+h74Yt4bM=
github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw=
github.com/chengjoey/flink-on-k8s-operator v0.0.1 h1:a4zrgATUtbQXFrfz0OAH1C9crs4fojzp2k25oNlgfl0=
github.com/chengjoey/flink-on-k8s-operator v0.0.1/go.mod h1:iltjjRPXWoa06VGEyxokBud/9kONhWOyzKm037QEwRQ=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down
8 changes: 8 additions & 0 deletions modules/pipeline/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Conf struct {
CollectorAddr string `env:"COLLECTOR_ADDR" required:"false"`
ClusterManagerAddr string `env:"CLUSTER_MANAGER_ADDR" required:"false"`

// pod ip
PodIP string `env:"POD_IP"`

// public url
GittarPublicURL string `env:"GITTAR_PUBLIC_URL" required:"true"`
OpenAPIPublicURL string `env:"OPENAPI_PUBLIC_URL" required:"true"`
Expand Down Expand Up @@ -315,3 +318,8 @@ func CronCompensateConcurrentNumber() int64 {
func CronFailureCreateIntervalCompensateTimeSecond() int64 {
return cfg.CronFailureCreateIntervalCompensateTimeSecond
}

// self pod ip
func PodIP() string {
return cfg.PodIP
}
6 changes: 3 additions & 3 deletions modules/pipeline/endpoints/clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
func (e *Endpoints) clusterHook(ctx context.Context, r *http.Request, vars map[string]string) (httpserver.Responser, error) {
req := apistructs.ClusterEvent{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
errStr := fmt.Sprintf("decode clusterhook request fail: %v", err)
errStr := fmt.Sprintf("failed to decode clusterhook request err: %v", err)
logrus.Error(errStr)
return httpserver.HTTPResponse{Status: http.StatusBadRequest, Content: errStr}, nil
return httpserver.ErrResp(http.StatusBadRequest, "", errStr)
}
if err := e.pipelineSvc.ClusterHook(req); err != nil {
errStr := fmt.Sprintf("failed to handle cluster event: %v", err)
logrus.Error(errStr)
return httpserver.HTTPResponse{Status: http.StatusBadRequest, Content: errStr}, nil
return httpserver.ErrResp(http.StatusBadRequest, "", errStr)
}
return httpserver.HTTPResponse{Status: http.StatusOK}, nil
}
6 changes: 5 additions & 1 deletion modules/pipeline/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
"github.com/erda-project/erda/pkg/http/httpserver"
)

const (
ClusterHookApiPath = "/api/pipeline-clusters/hook"
)

// Endpoints 定义 endpoint 方法
type Endpoints struct {
appSvc *appsvc.AppSvc
Expand Down Expand Up @@ -247,6 +251,6 @@ func (e *Endpoints) Routes() []httpserver.Endpoint {
{Path: "/api/pipeline-reportsets", Method: http.MethodGet, Handler: e.pagingPipelineReportSets},

// cluster info
{Path: "/clusterhook", Method: http.MethodPost, Handler: e.clusterHook},
{Path: ClusterHookApiPath, Method: http.MethodPost, Handler: e.clusterHook},
}
}
3 changes: 1 addition & 2 deletions modules/pipeline/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/erda-project/erda/modules/pipeline/services/reportsvc"
"github.com/erda-project/erda/modules/pipeline/services/snippetsvc"
"github.com/erda-project/erda/modules/pkg/websocket"
"github.com/erda-project/erda/pkg/discover"
"github.com/erda-project/erda/pkg/http/httpserver"
"github.com/erda-project/erda/pkg/jsonstore"
"github.com/erda-project/erda/pkg/jsonstore/etcd"
Expand Down Expand Up @@ -234,7 +233,7 @@ func registerClusterHook(bdl *bundle.Bundle) {
ev := apistructs.CreateHookRequest{
Name: "pipeline_watch_cluster_changed",
Events: []string{bundle.ClusterEvent},
URL: strutil.Concat("http://", discover.Pipeline(), "/clusterhook"),
URL: strutil.Concat("http://", conf.PodIP(), endpoints.ClusterHookApiPath),
Active: true,
HookLocation: apistructs.HookLocation{
Org: "-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *Manager) Initialize(cfgs []apistructs.ClusterInfo) error {
if cfgs[i].Type != apistructs.K8S {
continue
}
if err := m.addExecutor(cfgs[i]); err != nil {
if err := m.updateExecutor(cfgs[i]); err != nil {
continue
}
}
Expand Down Expand Up @@ -82,102 +82,6 @@ func (m *Manager) Get(name types.Name) (types.TaskExecutor, error) {
return e, nil
}

func (m *Manager) addExecutor(cluster apistructs.ClusterInfo) error {
m.Lock()
defer m.Unlock()

switch cluster.Type {
case apistructs.K8S:
k8sjobCreate, ok := m.factory[k8sjob.Kind]
if ok {
name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sjob.Kind))
if _, exist := m.executors[name]; exist {
return errors.Errorf("task executor name: %s already existed", name)
}
k8sjobExecutor, err := k8sjobCreate(name, cluster.Name, cluster)
if err != nil {
logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", k8sjob.Kind, name, err)
return err
}
m.executors[name] = k8sjobExecutor
logrus.Infof("=> kind [%s], name [%s], created", k8sjob.Kind, name)
}

k8sflinkCreate, ok := m.factory[k8sflink.Kind]
if ok {
name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sflink.Kind))
if _, exist := m.executors[name]; exist {
return errors.Errorf("task executor name: %s already existed", name)
}
k8sflinkExecutor, err := k8sflinkCreate(name, cluster.Name, cluster)
if err != nil {
logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", k8sflink.Kind, name, err)
return err
}
m.executors[name] = k8sflinkExecutor
}

k8ssparkCreate, ok := m.factory[k8sspark.Kind]
if ok {
name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, k8sspark.Kind))
if _, exist := m.executors[name]; exist {
return errors.Errorf("task executor name: %s already existed", name)
}
k8ssparkExecutor, err := k8ssparkCreate(name, cluster.Name, cluster)
if err != nil {
logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", k8sspark.Kind, name, err)
return err
}
m.executors[name] = k8ssparkExecutor
}
//case apistructs.ClusterTypeDcos:
// flinkCreate, ok := m.factory[flink.Kind]
// if ok {
// name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, flink.Kind))
// if _, exist := m.executors[name]; exist {
// return errors.Errorf("task executor name: %s already existed", name)
// }
// flinkExecutor, err := flinkCreate(name, cluster.Name, cluster)
// if err != nil {
// logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", flink.Kind, name, err)
// return err
// }
// m.executors[name] = flinkExecutor
// }
//
// sparkCreate, ok := m.factory[spark.Kind]
// if ok {
// name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, flink.Kind))
// if _, exist := m.executors[name]; exist {
// return errors.Errorf("task executor name: %s already existed", name)
// }
// sparkExecutor, err := sparkCreate(name, cluster.Name, cluster)
// if err != nil {
// logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", spark.Kind, name, err)
// return err
// }
// m.executors[name] = sparkExecutor
// }
//
// metronomeCreate, ok := m.factory[flink.Kind]
// if ok {
// name := types.Name(fmt.Sprintf("%sfor%s", cluster.Name, metronome.Kind))
// if _, exist := m.executors[name]; exist {
// return errors.Errorf("task executor name: %s already existed", name)
// }
// metronomeExecutor, err := metronomeCreate(name, cluster.Name, cluster)
// if err != nil {
// logrus.Errorf("=> kind [%s], name [%s], created failed, err: %v", metronome.Kind, name, err)
// return err
// }
// m.executors[name] = metronomeExecutor
// }
default:

}
return nil
}

func (m *Manager) deleteExecutor(cluster apistructs.ClusterInfo) {
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -266,7 +170,7 @@ func (m *Manager) listenClusterEventSync(ctx context.Context, eventChan <-chan a
case event := <-eventChan:
switch event.Action {
case apistructs.ClusterActionCreate:
err = m.addExecutor(event.Content)
err = m.updateExecutor(event.Content)
if err != nil {
logrus.Errorf("failed to add task executor, err: %v", err)
}
Expand Down

0 comments on commit 5fba941

Please sign in to comment.