From d92696a1bc058c7da8164f62c752d53156bfa76b Mon Sep 17 00:00:00 2001 From: David Birdsong Date: Fri, 1 May 2020 15:49:32 -0700 Subject: [PATCH 1/6] first pass for external-checkpoint updates: --- deploy/crd.yaml | 5 + pkg/apis/app/v1beta1/types.go | 3 + pkg/controller/flink/client/api.go | 93 ++++++++++++------- pkg/controller/flink/client/entities.go | 9 ++ pkg/controller/flink/client/mock/mock_api.go | 9 ++ pkg/controller/flink/config.go | 4 + pkg/controller/flink/flink.go | 22 ++++- pkg/controller/flink/flink_test.go | 10 ++ .../flinkapplication/flink_state_machine.go | 16 ++++ .../flink_state_machine_test.go | 2 +- 10 files changed, 136 insertions(+), 37 deletions(-) diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 4be01739..b9b496a5 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -116,6 +116,11 @@ spec: type: string savepointDisabled: type: boolean + savepointWithCheckpoint: + type: boolean + maxCheckpointDeployAgeSeconds: + type: integer + minimum: 1 maxCheckpointRestoreAgeSeconds: type: integer minimum: 1 diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index 35a20dd1..f725deec 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -45,6 +45,7 @@ type FlinkApplicationSpec struct { SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` SavepointPath string `json:"savepointPath,omitempty"` SavepointDisabled bool `json:"savepointDisabled"` + SavepointWithCheckpoint bool `json:"savepointWithCheckpoint,omitempty"` DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` RPCPort *int32 `json:"rpcPort,omitempty"` BlobPort *int32 `json:"blobPort,omitempty"` @@ -58,6 +59,7 @@ type FlinkApplicationSpec struct { AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` ForceRollback bool `json:"forceRollback"` MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"` + MaxCheckpointDeployAgeSeconds *int32 `json:"maxCheckpointDeployAgeSeconds,omitempty"` } type FlinkConfig map[string]interface{} @@ -312,5 +314,6 @@ const ( GetJobConfig FlinkMethod = "GetJobConfig" GetTaskManagers FlinkMethod = "GetTaskManagers" GetCheckpointCounts FlinkMethod = "GetCheckpointCounts" + GetCheckpointConfig FlinkMethod = "GetCheckpointConfig" GetJobOverview FlinkMethod = "GetJobOverview" ) diff --git a/pkg/controller/flink/client/api.go b/pkg/controller/flink/client/api.go index 76048d0b..bdc88c8f 100644 --- a/pkg/controller/flink/client/api.go +++ b/pkg/controller/flink/client/api.go @@ -57,6 +57,7 @@ type FlinkAPIInterface interface { GetJobConfig(ctx context.Context, url string, jobID string) (*JobConfigResponse, error) GetTaskManagers(ctx context.Context, url string) (*TaskManagersResponse, error) GetCheckpointCounts(ctx context.Context, url string, jobID string) (*CheckpointResponse, error) + GetCheckpointConfig(ctx context.Context, url string, jobID string) (*CheckpointConfigResponse, error) GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error) } @@ -65,45 +66,49 @@ type FlinkJobManagerClient struct { } type flinkJobManagerClientMetrics struct { - scope promutils.Scope - submitJobSuccessCounter labeled.Counter - submitJobFailureCounter labeled.Counter - cancelJobSuccessCounter labeled.Counter - cancelJobFailureCounter labeled.Counter - forceCancelJobSuccessCounter labeled.Counter - forceCancelJobFailureCounter labeled.Counter - checkSavepointSuccessCounter labeled.Counter - checkSavepointFailureCounter labeled.Counter - getJobsSuccessCounter labeled.Counter - getJobsFailureCounter labeled.Counter - getJobConfigSuccessCounter labeled.Counter - getJobConfigFailureCounter labeled.Counter - getClusterSuccessCounter labeled.Counter - getClusterFailureCounter labeled.Counter - getCheckpointsSuccessCounter labeled.Counter - getCheckpointsFailureCounter labeled.Counter + scope promutils.Scope + submitJobSuccessCounter labeled.Counter + submitJobFailureCounter labeled.Counter + cancelJobSuccessCounter labeled.Counter + cancelJobFailureCounter labeled.Counter + forceCancelJobSuccessCounter labeled.Counter + forceCancelJobFailureCounter labeled.Counter + checkSavepointSuccessCounter labeled.Counter + checkSavepointFailureCounter labeled.Counter + getJobsSuccessCounter labeled.Counter + getJobsFailureCounter labeled.Counter + getJobConfigSuccessCounter labeled.Counter + getJobConfigFailureCounter labeled.Counter + getClusterSuccessCounter labeled.Counter + getClusterFailureCounter labeled.Counter + getCheckpointsSuccessCounter labeled.Counter + getCheckpointsFailureCounter labeled.Counter + getCheckpointsConfigSuccessCounter labeled.Counter + getCheckpointsConfigFailureCounter labeled.Counter } func newFlinkJobManagerClientMetrics(scope promutils.Scope) *flinkJobManagerClientMetrics { flinkJmClientScope := scope.NewSubScope("flink_jm_client") return &flinkJobManagerClientMetrics{ - scope: scope, - submitJobSuccessCounter: labeled.NewCounter("submit_job_success", "Flink job submission successful", flinkJmClientScope), - submitJobFailureCounter: labeled.NewCounter("submit_job_failure", "Flink job submission failed", flinkJmClientScope), - cancelJobSuccessCounter: labeled.NewCounter("cancel_job_success", "Flink job cancellation successful", flinkJmClientScope), - cancelJobFailureCounter: labeled.NewCounter("cancel_job_failure", "Flink job cancellation failed", flinkJmClientScope), - forceCancelJobSuccessCounter: labeled.NewCounter("force_cancel_job_success", "Flink forced job cancellation successful", flinkJmClientScope), - forceCancelJobFailureCounter: labeled.NewCounter("force_cancel_job_failure", "Flink forced job cancellation failed", flinkJmClientScope), - checkSavepointSuccessCounter: labeled.NewCounter("check_savepoint_status_success", "Flink check savepoint status successful", flinkJmClientScope), - checkSavepointFailureCounter: labeled.NewCounter("check_savepoint_status_failure", "Flink check savepoint status failed", flinkJmClientScope), - getJobsSuccessCounter: labeled.NewCounter("get_jobs_success", "Get flink jobs succeeded", flinkJmClientScope), - getJobsFailureCounter: labeled.NewCounter("get_jobs_failure", "Get flink jobs failed", flinkJmClientScope), - getJobConfigSuccessCounter: labeled.NewCounter("get_job_config_success", "Get flink job config succeeded", flinkJmClientScope), - getJobConfigFailureCounter: labeled.NewCounter("get_job_config_failure", "Get flink job config failed", flinkJmClientScope), - getClusterSuccessCounter: labeled.NewCounter("get_cluster_success", "Get cluster overview succeeded", flinkJmClientScope), - getClusterFailureCounter: labeled.NewCounter("get_cluster_failure", "Get cluster overview failed", flinkJmClientScope), - getCheckpointsSuccessCounter: labeled.NewCounter("get_checkpoints_success", "Get checkpoint request succeeded", flinkJmClientScope), - getCheckpointsFailureCounter: labeled.NewCounter("get_checkpoints_failed", "Get checkpoint request failed", flinkJmClientScope), + scope: scope, + submitJobSuccessCounter: labeled.NewCounter("submit_job_success", "Flink job submission successful", flinkJmClientScope), + submitJobFailureCounter: labeled.NewCounter("submit_job_failure", "Flink job submission failed", flinkJmClientScope), + cancelJobSuccessCounter: labeled.NewCounter("cancel_job_success", "Flink job cancellation successful", flinkJmClientScope), + cancelJobFailureCounter: labeled.NewCounter("cancel_job_failure", "Flink job cancellation failed", flinkJmClientScope), + forceCancelJobSuccessCounter: labeled.NewCounter("force_cancel_job_success", "Flink forced job cancellation successful", flinkJmClientScope), + forceCancelJobFailureCounter: labeled.NewCounter("force_cancel_job_failure", "Flink forced job cancellation failed", flinkJmClientScope), + checkSavepointSuccessCounter: labeled.NewCounter("check_savepoint_status_success", "Flink check savepoint status successful", flinkJmClientScope), + checkSavepointFailureCounter: labeled.NewCounter("check_savepoint_status_failure", "Flink check savepoint status failed", flinkJmClientScope), + getJobsSuccessCounter: labeled.NewCounter("get_jobs_success", "Get flink jobs succeeded", flinkJmClientScope), + getJobsFailureCounter: labeled.NewCounter("get_jobs_failure", "Get flink jobs failed", flinkJmClientScope), + getJobConfigSuccessCounter: labeled.NewCounter("get_job_config_success", "Get flink job config succeeded", flinkJmClientScope), + getJobConfigFailureCounter: labeled.NewCounter("get_job_config_failure", "Get flink job config failed", flinkJmClientScope), + getClusterSuccessCounter: labeled.NewCounter("get_cluster_success", "Get cluster overview succeeded", flinkJmClientScope), + getClusterFailureCounter: labeled.NewCounter("get_cluster_failure", "Get cluster overview failed", flinkJmClientScope), + getCheckpointsSuccessCounter: labeled.NewCounter("get_checkpoints_success", "Get checkpoint request succeeded", flinkJmClientScope), + getCheckpointsFailureCounter: labeled.NewCounter("get_checkpoints_failed", "Get checkpoint request failed", flinkJmClientScope), + getCheckpointsConfigSuccessCounter: labeled.NewCounter("get_checkpoints_config_success", "Get checkpoint config request succeeded", flinkJmClientScope), + getCheckpointsConfigFailureCounter: labeled.NewCounter("get_checkpoints_config_failed", "Get checkpoint config request failed", flinkJmClientScope), } } @@ -365,6 +370,26 @@ func (c *FlinkJobManagerClient) GetCheckpointCounts(ctx context.Context, url str c.metrics.getCheckpointsSuccessCounter.Inc(ctx) return &checkpointResponse, nil } +func (c *FlinkJobManagerClient) GetCheckpointConfig(ctx context.Context, url string, jobID string) (*CheckpointConfigResponse, error) { + endpoint := fmt.Sprintf(url+checkpointsURL+"/config", jobID) + response, err := c.executeRequest(ctx, httpGet, endpoint, nil) + if err != nil { + c.metrics.getCheckpointsConfigFailureCounter.Inc(ctx) + return nil, GetRetryableError(err, v1beta1.GetCheckpointConfig, GlobalFailure, DefaultRetries) + } + if response != nil && !response.IsSuccess() { + c.metrics.getCheckpointsConfigFailureCounter.Inc(ctx) + return nil, GetRetryableError(err, v1beta1.GetCheckpointConfig, response.Status(), DefaultRetries) + } + + var checkpointConfigResponse CheckpointConfigResponse + if err = json.Unmarshal(response.Body(), &checkpointConfigResponse); err != nil { + logger.Errorf(ctx, "Failed to unmarshal checkpointConfigResponse %v, err %v", response, err) + } + + c.metrics.getCheckpointsSuccessCounter.Inc(ctx) + return &checkpointConfigResponse, nil +} func (c *FlinkJobManagerClient) GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error) { endpoint := fmt.Sprintf(url+GetJobsOverviewURL, jobID) diff --git a/pkg/controller/flink/client/entities.go b/pkg/controller/flink/client/entities.go index afcb87fa..33ff43e7 100644 --- a/pkg/controller/flink/client/entities.go +++ b/pkg/controller/flink/client/entities.go @@ -139,12 +139,21 @@ type LatestCheckpoints struct { Restored *CheckpointStatistics `json:"restored,omitempty"` } +type ExternalizedCheckpoints struct { + Enabled bool `json:"enable,omitempty"` + DeleteOnCancellation bool `json:"delete_on_cancellation,omitempty"` +} + type CheckpointResponse struct { Counts map[string]int32 `json:"counts"` Latest LatestCheckpoints `json:"latest"` History []CheckpointStatistics `json:"history"` } +type CheckpointConfigResponse struct { + Externalization *ExternalizedCheckpoints `json:"externalization"` +} + type TaskManagerStats struct { Path string `json:"path"` DataPort int32 `json:"dataPort"` diff --git a/pkg/controller/flink/client/mock/mock_api.go b/pkg/controller/flink/client/mock/mock_api.go index 67b7b773..51a727e1 100644 --- a/pkg/controller/flink/client/mock/mock_api.go +++ b/pkg/controller/flink/client/mock/mock_api.go @@ -16,6 +16,7 @@ type GetLatestCheckpointFunc func(ctx context.Context, url string, jobID string) type GetJobConfigFunc func(ctx context.Context, url string, jobID string) (*client.JobConfigResponse, error) type GetTaskManagersFunc func(ctx context.Context, url string) (*client.TaskManagersResponse, error) type GetCheckpointCountsFunc func(ctx context.Context, url string, jobID string) (*client.CheckpointResponse, error) +type GetCheckpointConfigFunc func(ctx context.Context, url string, jobID string) (*client.CheckpointConfigResponse, error) type GetJobOverviewFunc func(ctx context.Context, url string, jobID string) (*client.FlinkJobOverview, error) type JobManagerClient struct { @@ -29,6 +30,7 @@ type JobManagerClient struct { GetLatestCheckpointFunc GetLatestCheckpointFunc GetTaskManagersFunc GetTaskManagersFunc GetCheckpointCountsFunc GetCheckpointCountsFunc + GetCheckpointConfigFunc GetCheckpointConfigFunc GetJobOverviewFunc GetJobOverviewFunc } @@ -102,6 +104,13 @@ func (m *JobManagerClient) GetCheckpointCounts(ctx context.Context, url string, return nil, nil } +func (m *JobManagerClient) GetCheckpointConfig(ctx context.Context, url string, jobID string) (*client.CheckpointConfigResponse, error) { + if m.GetCheckpointConfigFunc != nil { + return m.GetCheckpointConfigFunc(ctx, url, jobID) + } + return nil, nil +} + func (m *JobManagerClient) GetJobOverview(ctx context.Context, url string, jobID string) (*client.FlinkJobOverview, error) { if m.GetJobOverviewFunc != nil { return m.GetJobOverviewFunc(ctx, url, jobID) diff --git a/pkg/controller/flink/config.go b/pkg/controller/flink/config.go index 1e375b32..4e878c37 100644 --- a/pkg/controller/flink/config.go +++ b/pkg/controller/flink/config.go @@ -20,6 +20,7 @@ const ( OffHeapMemoryDefaultFraction = 0.5 HighAvailabilityKey = "high-availability" MaxCheckpointRestoreAgeSeconds = 3600 + MaxCheckpointDeployAgeSeconds = 90 ) func firstNonNil(x *int32, y int32) int32 { @@ -71,6 +72,9 @@ func getInternalMetricsQueryPort(app *v1beta1.FlinkApplication) int32 { func getMaxCheckpointRestoreAgeSeconds(app *v1beta1.FlinkApplication) int32 { return firstNonNil(app.Spec.MaxCheckpointRestoreAgeSeconds, MaxCheckpointRestoreAgeSeconds) } +func getMaxCheckpointDeployAgeSeconds(app *v1beta1.FlinkApplication) int32 { + return firstNonNil(app.Spec.MaxCheckpointDeployAgeSeconds, MaxCheckpointDeployAgeSeconds) +} func getTaskManagerMemory(application *v1beta1.FlinkApplication) int64 { tmResources := application.Spec.TaskManagerConfig.Resources diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 8470b94c..9afda4b3 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -82,6 +82,9 @@ type ControllerInterface interface { // able to savepoint for some reason. FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) + // Ensures that application is configured to externalize and *not* delete checkpoints on cancel. + FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) + // Logs an event to the FlinkApplication resource and to the operator log LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string) @@ -445,7 +448,7 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1. return nil } -func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { +func (f *Controller) findExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, checkpointMaxAge int32) (string, error) { checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, getURLFromApp(application, hash), application.Status.JobStatus.JobID) var checkpointPath string var checkpointTime int64 @@ -466,12 +469,27 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application return "", nil } - if isCheckpointOldToRecover(checkpointTime, getMaxCheckpointRestoreAgeSeconds(application)) { + if isCheckpointOldToRecover(checkpointTime, checkpointMaxAge) { logger.Info(ctx, "Found checkpoint to restore from, but was too old") return "", nil } return checkpointPath, nil + +} +func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + checkpointConfig, err := f.flinkClient.GetCheckpointConfig(ctx, getURLFromApp(application, hash), application.Status.JobStatus.JobID) + if err != nil { + return "", err + } + if !(checkpointConfig.Externalization.Enabled && !checkpointConfig.Externalization.DeleteOnCancellation) { + return "", fmt.Errorf("Checkpoint configuration not compatable for starting from checkpoints") + } + return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointDeployAgeSeconds(application)) +} + +func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointRestoreAgeSeconds(application)) } func isCheckpointOldToRecover(checkpointTime int64, maxCheckpointRecoveryAgeSec int32) bool { diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index 52d15127..c5dc5646 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -826,6 +826,16 @@ func TestJobStatusUpdated(t *testing.T) { }, nil } + mockJmClient.GetCheckpointConfigFunc = func(ctx context.Context, url string, jobID string) (*client.CheckpointConfigResponse, error) { + assert.Equal(t, url, "http://app-name-hash.ns:8081") + return &client.CheckpointConfigResponse{ + Externalization: &client.ExternalizedCheckpoints{ + Enabled: true, + DeleteOnCancellation: false, + }, + }, nil + } + flinkApp.Status.JobStatus.JobID = "abc" expectedTime := metaV1.NewTime(time.Unix(startTime/1000, 0)) _, err = flinkControllerForTest.CompareAndUpdateJobStatus(context.Background(), &flinkApp, "hash") diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 668927eb..f72361ff 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -292,6 +292,18 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati } return statusChanged, nil } +func (s *FlinkStateMachine) handleApplicationSavepointingWithCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { + checkpointPath, err := s.flinkController.FindExternalizedCheckpointForSavepoint(ctx, application, application.Status.DeployHash) + if err != nil { + return statusUnchanged, err + } + + application.Status.SavepointPath = checkpointPath + application.Status.JobStatus.JobID = "" + s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) + return statusChanged, nil + +} func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { // we've already savepointed (or this is our first deploy), continue on @@ -307,6 +319,10 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering) return statusChanged, nil } + // use of checkpoints in the place of savepoints + if application.Spec.SavepointWithCheckpoint { + return s.handleApplicationSavepointingWithCheckpoint(ctx, application) + } // we haven't started savepointing yet; do so now // TODO: figure out the idempotence of this diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index a8534233..270a2770 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -50,7 +50,7 @@ func TestHandleNewOrCreate(t *testing.T) { stateMachineForTest := getTestStateMachine() mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8ClusteBasagoitia’sr.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationClusterStarting, application.Status.Phase) return nil From c108c8e4d2d280a92c637128b3192aa1b5dd86a8 Mon Sep 17 00:00:00 2001 From: David Birdsong Date: Mon, 4 May 2020 16:32:41 -0700 Subject: [PATCH 2/6] fixups for tests --- pkg/controller/flink/flink.go | 8 +-- pkg/controller/flink/mock/mock_flink.go | 64 +++++++++++-------- .../flink_state_machine_test.go | 2 +- 3 files changed, 40 insertions(+), 34 deletions(-) diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 2517b4ab..8f2cd914 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -472,7 +472,7 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1. } func (f *Controller) findExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, checkpointMaxAge int32) (string, error) { - checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, getURLFromApp(application, hash), application.Status.JobStatus.JobID) + checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, f.getURLFromApp(application, hash), application.Status.JobStatus.JobID) var checkpointPath string var checkpointTime int64 if err != nil { @@ -507,7 +507,7 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application } func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { - checkpointConfig, err := f.flinkClient.GetCheckpointConfig(ctx, getURLFromApp(application, hash), application.Status.JobStatus.JobID) + checkpointConfig, err := f.flinkClient.GetCheckpointConfig(ctx, f.getURLFromApp(application, hash), application.Status.JobStatus.JobID) if err != nil { return "", err } @@ -517,10 +517,6 @@ func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointDeployAgeSeconds(application)) } -func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { - return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointRestoreAgeSeconds(application)) -} - func isCheckpointOldToRecover(checkpointTime int64, maxCheckpointRecoveryAgeSec int32) bool { return time.Since(time.Unix(checkpointTime, 0)) > (time.Duration(maxCheckpointRecoveryAgeSec) * time.Second) } diff --git a/pkg/controller/flink/mock/mock_flink.go b/pkg/controller/flink/mock/mock_flink.go index 830bcfa2..fd01e39a 100644 --- a/pkg/controller/flink/mock/mock_flink.go +++ b/pkg/controller/flink/mock/mock_flink.go @@ -22,6 +22,7 @@ type GetJobsForApplicationFunc func(ctx context.Context, application *v1beta1.Fl type GetJobForApplicationFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) type GetCurrentDeploymentsForAppFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) type FindExternalizedCheckpointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) +type FindExternalizedCheckpointForSavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) type CompareAndUpdateClusterStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) type CompareAndUpdateJobStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) type GetLatestClusterStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus @@ -36,34 +37,36 @@ type DeleteStatusPostTeardownFunc func(ctx context.Context, application *v1beta1 type GetJobToDeleteForApplicationFunc func(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) type GetVersionAndJobIDForHashFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error) type GetVersionAndHashPostTeardownFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string) + type FlinkController struct { - CreateClusterFunc CreateClusterFunc - DeleteOldResourcesForAppFunc DeleteOldResourcesForApp - SavepointFunc SavepointFunc - ForceCancelFunc ForceCancelFunc - StartFlinkJobFunc StartFlinkJobFunc - GetSavepointStatusFunc GetSavepointStatusFunc - IsClusterReadyFunc IsClusterReadyFunc - IsServiceReadyFunc IsServiceReadyFunc - GetJobsForApplicationFunc GetJobsForApplicationFunc - GetJobForApplicationFunc GetJobForApplicationFunc - GetCurrentDeploymentsForAppFunc GetCurrentDeploymentsForAppFunc - FindExternalizedCheckpointFunc FindExternalizedCheckpointFunc - Events []corev1.Event - CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc - CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc - GetLatestClusterStatusFunc GetLatestClusterStatusFunc - GetLatestJobStatusFunc GetLatestJobStatusFunc - GetLatestJobIDFunc GetLatestJobIDFunc - UpdateLatestJobIDFunc UpdateLatestJobIDFunc - UpdateLatestJobStatusFunc UpdateLatestJobStatusFunc - UpdateLatestClusterStatusFunc UpdateLatestClusterStatusFunc - UpdateLatestVersionAndHashFunc UpdateLatestVersionAndHashFunc - DeleteResourcesForAppWithHashFunc DeleteResourcesForAppWithHashFunc - DeleteStatusPostTeardownFunc DeleteStatusPostTeardownFunc - GetJobToDeleteForApplicationFunc GetJobToDeleteForApplicationFunc - GetVersionAndJobIDForHashFunc GetVersionAndJobIDForHashFunc - GetVersionAndHashPostTeardownFunc GetVersionAndHashPostTeardownFunc + CreateClusterFunc CreateClusterFunc + DeleteOldResourcesForAppFunc DeleteOldResourcesForApp + SavepointFunc SavepointFunc + ForceCancelFunc ForceCancelFunc + StartFlinkJobFunc StartFlinkJobFunc + GetSavepointStatusFunc GetSavepointStatusFunc + IsClusterReadyFunc IsClusterReadyFunc + IsServiceReadyFunc IsServiceReadyFunc + GetJobsForApplicationFunc GetJobsForApplicationFunc + GetJobForApplicationFunc GetJobForApplicationFunc + GetCurrentDeploymentsForAppFunc GetCurrentDeploymentsForAppFunc + FindExternalizedCheckpointFunc FindExternalizedCheckpointFunc + FindExternalizedCheckpointForSavepointFunc FindExternalizedCheckpointForSavepointFunc + Events []corev1.Event + CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc + CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc + GetLatestClusterStatusFunc GetLatestClusterStatusFunc + GetLatestJobStatusFunc GetLatestJobStatusFunc + GetLatestJobIDFunc GetLatestJobIDFunc + UpdateLatestJobIDFunc UpdateLatestJobIDFunc + UpdateLatestJobStatusFunc UpdateLatestJobStatusFunc + UpdateLatestClusterStatusFunc UpdateLatestClusterStatusFunc + UpdateLatestVersionAndHashFunc UpdateLatestVersionAndHashFunc + DeleteResourcesForAppWithHashFunc DeleteResourcesForAppWithHashFunc + DeleteStatusPostTeardownFunc DeleteStatusPostTeardownFunc + GetJobToDeleteForApplicationFunc GetJobToDeleteForApplicationFunc + GetVersionAndJobIDForHashFunc GetVersionAndJobIDForHashFunc + GetVersionAndHashPostTeardownFunc GetVersionAndHashPostTeardownFunc } func (m *FlinkController) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) { @@ -151,6 +154,13 @@ func (m *FlinkController) FindExternalizedCheckpoint(ctx context.Context, applic return "", nil } +func (m *FlinkController) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + if m.FindExternalizedCheckpointForSavepointFunc != nil { + return m.FindExternalizedCheckpointFunc(ctx, application, hash) + } + return "", nil +} + func (m *FlinkController) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string) { m.Events = append(m.Events, corev1.Event{ InvolvedObject: corev1.ObjectReference{ diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index eda59b00..74f4d9ad 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -50,7 +50,7 @@ func TestHandleNewOrCreate(t *testing.T) { stateMachineForTest := getTestStateMachine() mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8ClusteBasagoitia’sr.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationClusterStarting, application.Status.Phase) return nil From 0969f4941d3c8abb688cb2888aad0e81304c8e8a Mon Sep 17 00:00:00 2001 From: David Birdsong Date: Tue, 12 May 2020 17:06:59 -0700 Subject: [PATCH 3/6] handle job cancelleation on checkpointupgrade --- pkg/controller/flinkapplication/flink_state_machine.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 34dc040b..6793b0c5 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -321,11 +321,17 @@ func (s *FlinkStateMachine) handleApplicationSavepointingWithCheckpoint(ctx cont return statusUnchanged, err } - application.Status.SavepointPath = checkpointPath + jobID := s.flinkController.GetLatestJobID(ctx, application) + if err := s.flinkController.ForceCancel(ctx, application, application.Status.DeployHash, jobID); err != nil { + return statusUnchanged, err + } + + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob", + fmt.Sprintf("Cancelling job job %s with a final checkpoint", jobID)) application.Status.JobStatus.JobID = "" + application.Status.SavepointPath = checkpointPath s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) return statusChanged, nil - } func (s *FlinkStateMachine) initializeAppStatusIfEmpty(application *v1beta1.FlinkApplication) { From 5c15785e2c63f4959e1b7a96428d339bb421a335 Mon Sep 17 00:00:00 2001 From: David Birdsong Date: Tue, 12 May 2020 20:03:41 -0700 Subject: [PATCH 4/6] fix boolean logic on checkpoint config check --- pkg/controller/flink/flink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 8f2cd914..afae4745 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -511,7 +511,7 @@ func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, if err != nil { return "", err } - if !(checkpointConfig.Externalization.Enabled && !checkpointConfig.Externalization.DeleteOnCancellation) { + if checkpointConfig.Externalization.Enabled && !checkpointConfig.Externalization.DeleteOnCancellation { return "", fmt.Errorf("Checkpoint configuration not compatable for starting from checkpoints") } return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointDeployAgeSeconds(application)) From 84e650eebd3c2ce4a07c56c6ac4dc328d861cbad Mon Sep 17 00:00:00 2001 From: David Birdsong Date: Wed, 1 Jul 2020 16:14:57 -0700 Subject: [PATCH 5/6] implement feedback on UpdateMode UpdateMode will allow for 3 differen actions taken before old application is cancelled: - savepoint - checkpoint - no state saved (taking over existing SavepointDisabled flag) --- deploy/crd.yaml | 8 +++----- pkg/apis/app/v1beta1/types.go | 18 ++++++++++++++++-- pkg/controller/flink/config.go | 5 ----- pkg/controller/flink/flink.go | 2 +- .../flinkapplication/flink_state_machine.go | 7 ++++--- 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 411979f6..cae2170f 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -78,6 +78,9 @@ spec: parallelism: type: integer minimum: 1 + updateMode: + type: string + enum: [Savepoint, Checkpoint, NoStateRestore] deleteMode: type: string enum: [Savepoint, None, ForceCancel] @@ -116,11 +119,6 @@ spec: type: string savepointDisabled: type: boolean - savepointWithCheckpoint: - type: boolean - maxCheckpointDeployAgeSeconds: - type: integer - minimum: 1 maxCheckpointRestoreAgeSeconds: type: integer minimum: 1 diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index e9f48226..9412131b 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -45,7 +45,6 @@ type FlinkApplicationSpec struct { SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` SavepointPath string `json:"savepointPath,omitempty"` SavepointDisabled bool `json:"savepointDisabled"` - SavepointWithCheckpoint bool `json:"savepointWithCheckpoint,omitempty"` DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` RPCPort *int32 `json:"rpcPort,omitempty"` BlobPort *int32 `json:"blobPort,omitempty"` @@ -55,14 +54,21 @@ type FlinkApplicationSpec struct { Volumes []apiv1.Volume `json:"volumes,omitempty"` VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"` RestartNonce string `json:"restartNonce"` + UpdateMode UpdateMode `json:"updateMode,omitempty"` DeleteMode DeleteMode `json:"deleteMode,omitempty"` AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` ForceRollback bool `json:"forceRollback"` MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"` - MaxCheckpointDeployAgeSeconds *int32 `json:"maxCheckpointDeployAgeSeconds,omitempty"` TearDownVersionHash string `json:"tearDownVersionHash,omitempty"` } +func (spec FlinkApplicationSpec) SavepointingDisabled() bool { + if spec.SavepointDisabled { + return spec.SavepointDisabled + } + return spec.UpdateMode == UpdateModeNoState +} + type FlinkConfig map[string]interface{} // Workaround for https://github.com/kubernetes-sigs/kubebuilder/issues/528 @@ -304,6 +310,14 @@ const ( DeleteModeNone DeleteMode = "None" ) +type UpdateMode string + +const ( + UpdateModeSavepoint UpdateMode = "Savepoint" + UpdateModeCheckpoint UpdateMode = "Checkpoint" + UpdateModeNoState UpdateMode = "NoStateRestore" +) + type HealthStatus string const ( diff --git a/pkg/controller/flink/config.go b/pkg/controller/flink/config.go index 4e878c37..c43b7a0c 100644 --- a/pkg/controller/flink/config.go +++ b/pkg/controller/flink/config.go @@ -20,7 +20,6 @@ const ( OffHeapMemoryDefaultFraction = 0.5 HighAvailabilityKey = "high-availability" MaxCheckpointRestoreAgeSeconds = 3600 - MaxCheckpointDeployAgeSeconds = 90 ) func firstNonNil(x *int32, y int32) int32 { @@ -72,10 +71,6 @@ func getInternalMetricsQueryPort(app *v1beta1.FlinkApplication) int32 { func getMaxCheckpointRestoreAgeSeconds(app *v1beta1.FlinkApplication) int32 { return firstNonNil(app.Spec.MaxCheckpointRestoreAgeSeconds, MaxCheckpointRestoreAgeSeconds) } -func getMaxCheckpointDeployAgeSeconds(app *v1beta1.FlinkApplication) int32 { - return firstNonNil(app.Spec.MaxCheckpointDeployAgeSeconds, MaxCheckpointDeployAgeSeconds) -} - func getTaskManagerMemory(application *v1beta1.FlinkApplication) int64 { tmResources := application.Spec.TaskManagerConfig.Resources if tmResources == nil { diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index afae4745..0e011d7d 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -514,7 +514,7 @@ func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, if checkpointConfig.Externalization.Enabled && !checkpointConfig.Externalization.DeleteOnCancellation { return "", fmt.Errorf("Checkpoint configuration not compatable for starting from checkpoints") } - return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointDeployAgeSeconds(application)) + return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointRestoreAgeSeconds(application)) } func isCheckpointOldToRecover(checkpointTime int64, maxCheckpointRecoveryAgeSec int32) bool { diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 6793b0c5..0675495c 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -233,6 +233,7 @@ func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplic // In this state we create a new cluster, either due to an entirely new FlinkApplication or due to an update. func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { + // TODO: add up-front validation on the FlinkApplication resource if rollback, reason := s.shouldRollback(ctx, application); rollback { // we've failed to make progress; move to deploy failed @@ -305,9 +306,9 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati logger.Infof(ctx, "Flink cluster has started successfully") // TODO: in single mode move to submitting job - if application.Spec.SavepointDisabled && !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + if application.Spec.SavepointingDisabled() && !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { s.updateApplicationPhase(application, v1beta1.FlinkApplicationCancelling) - } else if application.Spec.SavepointDisabled && v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + } else if application.Spec.SavepointingDisabled() && v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { // Blue Green deployment and no savepoint required implies, we directly transition to submitting job s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) } else { @@ -361,7 +362,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a return statusChanged, nil } // use of checkpoints in the place of savepoints - if application.Spec.SavepointWithCheckpoint { + if application.Spec.UpdateMode == v1beta1.UpdateModeCheckpoint { return s.handleApplicationSavepointingWithCheckpoint(ctx, application) } From 7b4e12d7e929a28f261083b627992d1e55927a82 Mon Sep 17 00:00:00 2001 From: David Birdsong Date: Tue, 14 Jul 2020 16:18:33 -0700 Subject: [PATCH 6/6] log savepointpath savepointpath can be supplied or found dynamically, helps with debugpath to observe which one is used --- pkg/controller/flinkapplication/flink_state_machine.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 0675495c..3292cf67 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -350,6 +350,7 @@ func (s *FlinkStateMachine) initializeAppStatusIfEmpty(application *v1beta1.Flin func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { // we've already savepointed (or this is our first deploy), continue on if application.Status.SavepointPath != "" || application.Status.DeployHash == "" { + logger.Debugf(ctx, "Using SavepointPath: %s", application.Status.SavepointPath) s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) return statusChanged, nil }