-
Notifications
You must be signed in to change notification settings - Fork 378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pipeline refactor and cluster watch #731
pipeline refactor and cluster watch #731
Conversation
1fb34d4
to
574c817
Compare
Codecov Report
@@ Coverage Diff @@
## master #731 +/- ##
==========================================
+ Coverage 10.14% 10.18% +0.03%
==========================================
Files 940 937 -3
Lines 88167 87840 -327
==========================================
Hits 8946 8946
+ Misses 78287 77961 -326
+ Partials 934 933 -1
|
apistructs/cluster.go
Outdated
) | ||
|
||
const ( | ||
ClusterTypeDcos = "dcos" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you reuse definitions at line 20~22 in this file.
574c817
to
695a835
Compare
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||
errStr := fmt.Sprintf("decode clusterhook request fail: %v", err) | ||
logrus.Error(errStr) | ||
return httpserver.HTTPResponse{Status: http.StatusBadRequest, Content: errStr}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use httpserver.ErrResp
@@ -245,5 +245,8 @@ func (e *Endpoints) Routes() []httpserver.Endpoint { | |||
// reports | |||
{Path: "/api/pipeline-reportsets/{pipelineID}", Method: http.MethodGet, Handler: e.queryPipelineReportSet}, | |||
{Path: "/api/pipeline-reportsets", Method: http.MethodGet, Handler: e.pagingPipelineReportSets}, | |||
|
|||
// cluster info | |||
{Path: "/clusterhook", Method: http.MethodPost, Handler: e.clusterHook}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clusterhook for what, you should specified in path.
modules/pipeline/initialize.go
Outdated
ev := apistructs.CreateHookRequest{ | ||
Name: "pipeline_watch_cluster_changed", | ||
Events: []string{bundle.ClusterEvent}, | ||
URL: strutil.Concat("http://", discover.Pipeline(), "/clusterhook"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use same const variable with endpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about multi instance?
func (e *Endpoints) clusterHook(ctx context.Context, r *http.Request, vars map[string]string) (httpserver.Responser, error) { | ||
req := apistructs.ClusterEvent{} | ||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||
errStr := fmt.Sprintf("decode clusterhook request fail: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failed to xxx
} | ||
} | ||
|
||
func (m *Manager) updateExecutor(cluster apistructs.ClusterInfo) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated with addExecutor.
695a835
to
5fba941
Compare
@@ -228,6 +228,8 @@ github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgk | |||
github.com/cheggaaa/pb/v3 v3.0.1/go.mod h1:SqqeMF/pMOIu3xgGoxtPYhMNQP258xE4x/XRTYua+KU= | |||
github.com/cheggaaa/pb/v3 v3.0.4 h1:QZEPYOj2ix6d5oEg63fbHmpolrnNiwjUsk+h74Yt4bM= | |||
github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw= | |||
github.com/chengjoey/flink-on-k8s-operator v0.0.1 h1:a4zrgATUtbQXFrfz0OAH1C9crs4fojzp2k25oNlgfl0= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (e *Endpoints) clusterHook(ctx context.Context, r *http.Request, vars map[string]string) (httpserver.Responser, error) { | ||
req := apistructs.ClusterEvent{} | ||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||
errStr := fmt.Sprintf("failed to decode clusterhook request err: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failed to decode clusterhook request, err: %v
return httpserver.ErrResp(http.StatusBadRequest, "", errStr) | ||
} | ||
if err := e.pipelineSvc.ClusterHook(req); err != nil { | ||
errStr := fmt.Sprintf("failed to handle cluster event: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failed to handle cluster event, err: %v
logrus.Error(errStr) | ||
return httpserver.ErrResp(http.StatusBadRequest, "", errStr) | ||
} | ||
return httpserver.HTTPResponse{Status: http.StatusOK}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
httpserver.OkResp
@@ -37,6 +37,10 @@ import ( | |||
"github.com/erda-project/erda/pkg/http/httpserver" | |||
) | |||
|
|||
const ( | |||
ClusterHookApiPath = "/api/pipeline-clusters/hook" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/api/{resource}/actions/{concreteAction}
modules/pipeline/initialize.go
Outdated
@@ -194,6 +201,9 @@ func do() (*httpserver.Server, error) { | |||
return nil, err | |||
} | |||
|
|||
// register cluster hook | |||
registerClusterHook(bdl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put registerClusterHook into pkg clusterinfo is better?
Put these two methods together at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or only one method here such as doClusterAbout
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scheduler initialze need cluster info, so clusterinfo.initialize should make first
cluster hook should register after pipeline service start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move registerClusterHook
into clusterinfo.Initialize
modules/pipeline/initialize.go
Outdated
}, | ||
} | ||
if err := bdl.CreateWebhook(ev); err != nil { | ||
logrus.Warnf("failed to register watch cluster changed event, %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return err and handle err at invoker side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think pipeline should panic if registerClusterHook failed at bootsstrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each time the pod ip is different.
How about old pod ips already registered? Maybe eventbox will continue send msgs to them but failed of course. And the subscribers(pod ips) will be more and more ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, maybe we should continue register with service ip and all instances listen from etcd.
} | ||
go m.listenClusterEventSync(context.Background(), eventChan) | ||
|
||
logrus.Info("pipengine task executor manager Initialize Done .") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline scheduler task executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check all others.
) | ||
|
||
const ( | ||
DiceRootDomainKEY = "DICE_ROOT_DOMAIN" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key
package k8sjob | ||
|
||
var ( | ||
errPullImage = "拉取镜像失败" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later with i18n
"context" | ||
"fmt" | ||
|
||
"github.com/gogap/errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check all error pkg, use fmt.Errorf or unified github.com/pkg/errors.
@@ -0,0 +1,45 @@ | |||
// Copyright (c) 2021 Terminus, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo of file name schedule.
if data == nil { | ||
return nil | ||
} | ||
jobID, ok := data.(string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should check task executor type here.
Not all string type data is job id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only dcos return job id, so remove inJectJobID
@@ -159,6 +164,19 @@ func (s *start) TuneTriggers() taskrun.TaskOpTuneTriggers { | |||
} | |||
} | |||
|
|||
// injectJobID save flink, spark job id after start | |||
func injectJobID(tr *taskrun.TaskRun, data interface{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
injectJobIDForFlinkSpark is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only dcos return str job id,already remove injectJobID
@@ -18,9 +18,11 @@ import ( | |||
"strings" | |||
"time" | |||
|
|||
"github.com/gogap/errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same.
|
||
// ClusterHook listen and dispatch cluster event from eventbox | ||
func (s *PipelineSvc) ClusterHook(clusterEvent apistructs.ClusterEvent) error { | ||
if !strutil.Equal(clusterEvent.Content.Type, apistructs.K8S, true) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use switch grammar is more clear here.
@@ -95,6 +95,8 @@ type PipelineTaskExtra struct { | |||
LoopOptions *apistructs.PipelineTaskLoopOptions `json:"loopOptions,omitempty"` // 开始执行后保证不为空 | |||
|
|||
AppliedResources apistructs.PipelineAppliedResources `json:"appliedResources,omitempty"` | |||
|
|||
JobID string `json:"jobID,omitempty"` // flink, spark job id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concrete field name.
5fba941
to
4c14eb3
Compare
4c14eb3
to
d3b5121
Compare
modules/pipeline/initialize.go
Outdated
@@ -194,6 +201,9 @@ func do() (*httpserver.Server, error) { | |||
return nil, err | |||
} | |||
|
|||
// register cluster hook | |||
registerClusterHook(bdl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move registerClusterHook
into clusterinfo.Initialize
d3b5121
to
9ce8942
Compare
9ce8942
to
39047d4
Compare
/approve |
What type of this PR
Add one of the following kinds:
/kind feature
What this PR does / why we need it:
pipeline refactor, add three executor in scheduler plugin(k8sjob, k8sflink, k8sspark), dcos and edas will dispath to scheduler module
get cluster from cluster-manager module, and register cluster hook
Which issue(s) this PR fixes:
Specified Reviewers:
/assign @your-reviewer