Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update with external checkpoints #199

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
@@ -78,6 +78,9 @@ spec:
parallelism:
type: integer
minimum: 1
updateMode:
type: string
enum: [Savepoint, Checkpoint, NoStateRestore]
deleteMode:
type: string
enum: [Savepoint, None, ForceCancel]
17 changes: 17 additions & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
@@ -54,13 +54,21 @@ type FlinkApplicationSpec struct {
Volumes []apiv1.Volume `json:"volumes,omitempty"`
VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"`
RestartNonce string `json:"restartNonce"`
UpdateMode UpdateMode `json:"updateMode,omitempty"`
DeleteMode DeleteMode `json:"deleteMode,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
TearDownVersionHash string `json:"tearDownVersionHash,omitempty"`
}

func (spec FlinkApplicationSpec) SavepointingDisabled() bool {
if spec.SavepointDisabled {
return spec.SavepointDisabled
}
return spec.UpdateMode == UpdateModeNoState
}

type FlinkConfig map[string]interface{}

// Workaround for https://github.com/kubernetes-sigs/kubebuilder/issues/528
@@ -302,6 +310,14 @@ const (
DeleteModeNone DeleteMode = "None"
)

type UpdateMode string

const (
UpdateModeSavepoint UpdateMode = "Savepoint"
UpdateModeCheckpoint UpdateMode = "Checkpoint"
UpdateModeNoState UpdateMode = "NoStateRestore"
)

type HealthStatus string

const (
@@ -353,6 +369,7 @@ const (
GetJobConfig FlinkMethod = "GetJobConfig"
GetTaskManagers FlinkMethod = "GetTaskManagers"
GetCheckpointCounts FlinkMethod = "GetCheckpointCounts"
GetCheckpointConfig FlinkMethod = "GetCheckpointConfig"
GetJobOverview FlinkMethod = "GetJobOverview"
SavepointJob FlinkMethod = "SavepointJob"
)
101 changes: 63 additions & 38 deletions pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ type FlinkAPIInterface interface {
GetJobConfig(ctx context.Context, url string, jobID string) (*JobConfigResponse, error)
GetTaskManagers(ctx context.Context, url string) (*TaskManagersResponse, error)
GetCheckpointCounts(ctx context.Context, url string, jobID string) (*CheckpointResponse, error)
GetCheckpointConfig(ctx context.Context, url string, jobID string) (*CheckpointConfigResponse, error)
GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error)
}

@@ -66,49 +67,53 @@ type FlinkJobManagerClient struct {
}

type flinkJobManagerClientMetrics struct {
scope promutils.Scope
submitJobSuccessCounter labeled.Counter
submitJobFailureCounter labeled.Counter
cancelJobSuccessCounter labeled.Counter
cancelJobFailureCounter labeled.Counter
forceCancelJobSuccessCounter labeled.Counter
forceCancelJobFailureCounter labeled.Counter
checkSavepointSuccessCounter labeled.Counter
checkSavepointFailureCounter labeled.Counter
getJobsSuccessCounter labeled.Counter
getJobsFailureCounter labeled.Counter
getJobConfigSuccessCounter labeled.Counter
getJobConfigFailureCounter labeled.Counter
getClusterSuccessCounter labeled.Counter
getClusterFailureCounter labeled.Counter
getCheckpointsSuccessCounter labeled.Counter
getCheckpointsFailureCounter labeled.Counter
savepointJobSuccessCounter labeled.Counter
savepointJobFailureCounter labeled.Counter
scope promutils.Scope
submitJobSuccessCounter labeled.Counter
submitJobFailureCounter labeled.Counter
cancelJobSuccessCounter labeled.Counter
cancelJobFailureCounter labeled.Counter
forceCancelJobSuccessCounter labeled.Counter
forceCancelJobFailureCounter labeled.Counter
checkSavepointSuccessCounter labeled.Counter
checkSavepointFailureCounter labeled.Counter
getJobsSuccessCounter labeled.Counter
getJobsFailureCounter labeled.Counter
getJobConfigSuccessCounter labeled.Counter
getJobConfigFailureCounter labeled.Counter
getClusterSuccessCounter labeled.Counter
getClusterFailureCounter labeled.Counter
getCheckpointsSuccessCounter labeled.Counter
getCheckpointsFailureCounter labeled.Counter
savepointJobSuccessCounter labeled.Counter
savepointJobFailureCounter labeled.Counter
getCheckpointsConfigSuccessCounter labeled.Counter
getCheckpointsConfigFailureCounter labeled.Counter
}

func newFlinkJobManagerClientMetrics(scope promutils.Scope) *flinkJobManagerClientMetrics {
flinkJmClientScope := scope.NewSubScope("flink_jm_client")
return &flinkJobManagerClientMetrics{
scope: scope,
submitJobSuccessCounter: labeled.NewCounter("submit_job_success", "Flink job submission successful", flinkJmClientScope),
submitJobFailureCounter: labeled.NewCounter("submit_job_failure", "Flink job submission failed", flinkJmClientScope),
cancelJobSuccessCounter: labeled.NewCounter("cancel_job_success", "Flink job cancellation successful", flinkJmClientScope),
cancelJobFailureCounter: labeled.NewCounter("cancel_job_failure", "Flink job cancellation failed", flinkJmClientScope),
forceCancelJobSuccessCounter: labeled.NewCounter("force_cancel_job_success", "Flink forced job cancellation successful", flinkJmClientScope),
forceCancelJobFailureCounter: labeled.NewCounter("force_cancel_job_failure", "Flink forced job cancellation failed", flinkJmClientScope),
checkSavepointSuccessCounter: labeled.NewCounter("check_savepoint_status_success", "Flink check savepoint status successful", flinkJmClientScope),
checkSavepointFailureCounter: labeled.NewCounter("check_savepoint_status_failure", "Flink check savepoint status failed", flinkJmClientScope),
getJobsSuccessCounter: labeled.NewCounter("get_jobs_success", "Get flink jobs succeeded", flinkJmClientScope),
getJobsFailureCounter: labeled.NewCounter("get_jobs_failure", "Get flink jobs failed", flinkJmClientScope),
getJobConfigSuccessCounter: labeled.NewCounter("get_job_config_success", "Get flink job config succeeded", flinkJmClientScope),
getJobConfigFailureCounter: labeled.NewCounter("get_job_config_failure", "Get flink job config failed", flinkJmClientScope),
getClusterSuccessCounter: labeled.NewCounter("get_cluster_success", "Get cluster overview succeeded", flinkJmClientScope),
getClusterFailureCounter: labeled.NewCounter("get_cluster_failure", "Get cluster overview failed", flinkJmClientScope),
getCheckpointsSuccessCounter: labeled.NewCounter("get_checkpoints_success", "Get checkpoint request succeeded", flinkJmClientScope),
getCheckpointsFailureCounter: labeled.NewCounter("get_checkpoints_failed", "Get checkpoint request failed", flinkJmClientScope),
savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope),
savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope),
scope: scope,
submitJobSuccessCounter: labeled.NewCounter("submit_job_success", "Flink job submission successful", flinkJmClientScope),
submitJobFailureCounter: labeled.NewCounter("submit_job_failure", "Flink job submission failed", flinkJmClientScope),
cancelJobSuccessCounter: labeled.NewCounter("cancel_job_success", "Flink job cancellation successful", flinkJmClientScope),
cancelJobFailureCounter: labeled.NewCounter("cancel_job_failure", "Flink job cancellation failed", flinkJmClientScope),
forceCancelJobSuccessCounter: labeled.NewCounter("force_cancel_job_success", "Flink forced job cancellation successful", flinkJmClientScope),
forceCancelJobFailureCounter: labeled.NewCounter("force_cancel_job_failure", "Flink forced job cancellation failed", flinkJmClientScope),
checkSavepointSuccessCounter: labeled.NewCounter("check_savepoint_status_success", "Flink check savepoint status successful", flinkJmClientScope),
checkSavepointFailureCounter: labeled.NewCounter("check_savepoint_status_failure", "Flink check savepoint status failed", flinkJmClientScope),
getJobsSuccessCounter: labeled.NewCounter("get_jobs_success", "Get flink jobs succeeded", flinkJmClientScope),
getJobsFailureCounter: labeled.NewCounter("get_jobs_failure", "Get flink jobs failed", flinkJmClientScope),
getJobConfigSuccessCounter: labeled.NewCounter("get_job_config_success", "Get flink job config succeeded", flinkJmClientScope),
getJobConfigFailureCounter: labeled.NewCounter("get_job_config_failure", "Get flink job config failed", flinkJmClientScope),
getClusterSuccessCounter: labeled.NewCounter("get_cluster_success", "Get cluster overview succeeded", flinkJmClientScope),
getClusterFailureCounter: labeled.NewCounter("get_cluster_failure", "Get cluster overview failed", flinkJmClientScope),
getCheckpointsSuccessCounter: labeled.NewCounter("get_checkpoints_success", "Get checkpoint request succeeded", flinkJmClientScope),
getCheckpointsFailureCounter: labeled.NewCounter("get_checkpoints_failed", "Get checkpoint request failed", flinkJmClientScope),
savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope),
savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope),
getCheckpointsConfigSuccessCounter: labeled.NewCounter("get_checkpoints_config_success", "Get checkpoint config request succeeded", flinkJmClientScope),
getCheckpointsConfigFailureCounter: labeled.NewCounter("get_checkpoints_config_failed", "Get checkpoint config request failed", flinkJmClientScope),
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love go fmt, but it can also be disruptive causing PR's to bloat changes and hides the actual diff.

I could add some line breaks to separate out the new, longer struct member and minimize the line change count on things like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Not a problem, ok to leave as is :)

}

@@ -369,6 +374,26 @@ func (c *FlinkJobManagerClient) GetCheckpointCounts(ctx context.Context, url str
c.metrics.getCheckpointsSuccessCounter.Inc(ctx)
return &checkpointResponse, nil
}
func (c *FlinkJobManagerClient) GetCheckpointConfig(ctx context.Context, url string, jobID string) (*CheckpointConfigResponse, error) {
endpoint := fmt.Sprintf(url+checkpointsURL+"/config", jobID)
response, err := c.executeRequest(ctx, httpGet, endpoint, nil)
if err != nil {
c.metrics.getCheckpointsConfigFailureCounter.Inc(ctx)
return nil, GetRetryableError(err, v1beta1.GetCheckpointConfig, GlobalFailure, DefaultRetries)
}
if response != nil && !response.IsSuccess() {
c.metrics.getCheckpointsConfigFailureCounter.Inc(ctx)
return nil, GetRetryableError(err, v1beta1.GetCheckpointConfig, response.Status(), DefaultRetries)
}

var checkpointConfigResponse CheckpointConfigResponse
if err = json.Unmarshal(response.Body(), &checkpointConfigResponse); err != nil {
logger.Errorf(ctx, "Failed to unmarshal checkpointConfigResponse %v, err %v", response, err)
}

c.metrics.getCheckpointsSuccessCounter.Inc(ctx)
return &checkpointConfigResponse, nil
}

func (c *FlinkJobManagerClient) GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error) {
endpoint := fmt.Sprintf(url+GetJobsOverviewURL, jobID)
9 changes: 9 additions & 0 deletions pkg/controller/flink/client/entities.go
Original file line number Diff line number Diff line change
@@ -139,12 +139,21 @@ type LatestCheckpoints struct {
Restored *CheckpointStatistics `json:"restored,omitempty"`
}

type ExternalizedCheckpoints struct {
Enabled bool `json:"enable,omitempty"`
DeleteOnCancellation bool `json:"delete_on_cancellation,omitempty"`
}

type CheckpointResponse struct {
Counts map[string]int32 `json:"counts"`
Latest LatestCheckpoints `json:"latest"`
History []CheckpointStatistics `json:"history"`
}

type CheckpointConfigResponse struct {
Externalization *ExternalizedCheckpoints `json:"externalization"`
}

type TaskManagerStats struct {
Path string `json:"path"`
DataPort int32 `json:"dataPort"`
9 changes: 9 additions & 0 deletions pkg/controller/flink/client/mock/mock_api.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ type GetLatestCheckpointFunc func(ctx context.Context, url string, jobID string)
type GetJobConfigFunc func(ctx context.Context, url string, jobID string) (*client.JobConfigResponse, error)
type GetTaskManagersFunc func(ctx context.Context, url string) (*client.TaskManagersResponse, error)
type GetCheckpointCountsFunc func(ctx context.Context, url string, jobID string) (*client.CheckpointResponse, error)
type GetCheckpointConfigFunc func(ctx context.Context, url string, jobID string) (*client.CheckpointConfigResponse, error)
type GetJobOverviewFunc func(ctx context.Context, url string, jobID string) (*client.FlinkJobOverview, error)
type SavepointJobFunc func(ctx context.Context, url string, jobID string) (string, error)
type JobManagerClient struct {
@@ -29,6 +30,7 @@ type JobManagerClient struct {
GetLatestCheckpointFunc GetLatestCheckpointFunc
GetTaskManagersFunc GetTaskManagersFunc
GetCheckpointCountsFunc GetCheckpointCountsFunc
GetCheckpointConfigFunc GetCheckpointConfigFunc
GetJobOverviewFunc GetJobOverviewFunc
SavepointJobFunc SavepointJobFunc
}
@@ -103,6 +105,13 @@ func (m *JobManagerClient) GetCheckpointCounts(ctx context.Context, url string,
return nil, nil
}

func (m *JobManagerClient) GetCheckpointConfig(ctx context.Context, url string, jobID string) (*client.CheckpointConfigResponse, error) {
if m.GetCheckpointConfigFunc != nil {
return m.GetCheckpointConfigFunc(ctx, url, jobID)
}
return nil, nil
}

func (m *JobManagerClient) GetJobOverview(ctx context.Context, url string, jobID string) (*client.FlinkJobOverview, error) {
if m.GetJobOverviewFunc != nil {
return m.GetJobOverviewFunc(ctx, url, jobID)
1 change: 0 additions & 1 deletion pkg/controller/flink/config.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,6 @@ func getInternalMetricsQueryPort(app *v1beta1.FlinkApplication) int32 {
func getMaxCheckpointRestoreAgeSeconds(app *v1beta1.FlinkApplication) int32 {
return firstNonNil(app.Spec.MaxCheckpointRestoreAgeSeconds, MaxCheckpointRestoreAgeSeconds)
}

func getTaskManagerMemory(application *v1beta1.FlinkApplication) int64 {
tmResources := application.Spec.TaskManagerConfig.Resources
if tmResources == nil {
25 changes: 22 additions & 3 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
@@ -85,6 +85,9 @@ type ControllerInterface interface {
// able to savepoint for some reason.
FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)

// Ensures that application is configured to externalize and *not* delete checkpoints on cancel.
FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)

// Logs an event to the FlinkApplication resource and to the operator log
LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string)

@@ -468,8 +471,8 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1.
return nil
}

func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, f.getURLFromApp(application, hash), f.GetLatestJobID(ctx, application))
func (f *Controller) findExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, checkpointMaxAge int32) (string, error) {
checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, f.getURLFromApp(application, hash), application.Status.JobStatus.JobID)
var checkpointPath string
var checkpointTime int64
if err != nil {
@@ -490,12 +493,28 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application
return "", nil
}

if isCheckpointOldToRecover(checkpointTime, getMaxCheckpointRestoreAgeSeconds(application)) {
if isCheckpointOldToRecover(checkpointTime, checkpointMaxAge) {
logger.Info(ctx, "Found checkpoint to restore from, but was too old")
return "", nil
}

return checkpointPath, nil

}

func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointRestoreAgeSeconds(application))
}

func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
checkpointConfig, err := f.flinkClient.GetCheckpointConfig(ctx, f.getURLFromApp(application, hash), application.Status.JobStatus.JobID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there opportunity to combine FindExternalizedCheckpoint and FindExternalizedCheckpointForSavepoint into a single method? I believe that the checkpoint configuration check (to ensure it's RETAIN_ON_CANCELLATION) is applicable to both cases.

if err != nil {
return "", err
}
if checkpointConfig.Externalization.Enabled && !checkpointConfig.Externalization.DeleteOnCancellation {
return "", fmt.Errorf("Checkpoint configuration not compatable for starting from checkpoints")
}
return f.findExternalizedCheckpoint(ctx, application, hash, getMaxCheckpointRestoreAgeSeconds(application))
}

func isCheckpointOldToRecover(checkpointTime int64, maxCheckpointRecoveryAgeSec int32) bool {
10 changes: 10 additions & 0 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
@@ -826,6 +826,16 @@ func TestJobStatusUpdated(t *testing.T) {
}, nil
}

mockJmClient.GetCheckpointConfigFunc = func(ctx context.Context, url string, jobID string) (*client.CheckpointConfigResponse, error) {
assert.Equal(t, url, "http://app-name-hash.ns:8081")
return &client.CheckpointConfigResponse{
Externalization: &client.ExternalizedCheckpoints{
Enabled: true,
DeleteOnCancellation: false,
},
}, nil
}

flinkApp.Status.JobStatus.JobID = "abc"
expectedTime := metaV1.NewTime(time.Unix(startTime/1000, 0))
_, err = flinkControllerForTest.CompareAndUpdateJobStatus(context.Background(), &flinkApp, "hash")
64 changes: 37 additions & 27 deletions pkg/controller/flink/mock/mock_flink.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ type GetJobsForApplicationFunc func(ctx context.Context, application *v1beta1.Fl
type GetJobForApplicationFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
type GetCurrentDeploymentsForAppFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error)
type FindExternalizedCheckpointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)
type FindExternalizedCheckpointForSavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)
type CompareAndUpdateClusterStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
type CompareAndUpdateJobStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
type GetLatestClusterStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus
@@ -36,34 +37,36 @@ type DeleteStatusPostTeardownFunc func(ctx context.Context, application *v1beta1
type GetJobToDeleteForApplicationFunc func(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
type GetVersionAndJobIDForHashFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error)
type GetVersionAndHashPostTeardownFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string)

type FlinkController struct {
CreateClusterFunc CreateClusterFunc
DeleteOldResourcesForAppFunc DeleteOldResourcesForApp
SavepointFunc SavepointFunc
ForceCancelFunc ForceCancelFunc
StartFlinkJobFunc StartFlinkJobFunc
GetSavepointStatusFunc GetSavepointStatusFunc
IsClusterReadyFunc IsClusterReadyFunc
IsServiceReadyFunc IsServiceReadyFunc
GetJobsForApplicationFunc GetJobsForApplicationFunc
GetJobForApplicationFunc GetJobForApplicationFunc
GetCurrentDeploymentsForAppFunc GetCurrentDeploymentsForAppFunc
FindExternalizedCheckpointFunc FindExternalizedCheckpointFunc
Events []corev1.Event
CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc
CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc
GetLatestClusterStatusFunc GetLatestClusterStatusFunc
GetLatestJobStatusFunc GetLatestJobStatusFunc
GetLatestJobIDFunc GetLatestJobIDFunc
UpdateLatestJobIDFunc UpdateLatestJobIDFunc
UpdateLatestJobStatusFunc UpdateLatestJobStatusFunc
UpdateLatestClusterStatusFunc UpdateLatestClusterStatusFunc
UpdateLatestVersionAndHashFunc UpdateLatestVersionAndHashFunc
DeleteResourcesForAppWithHashFunc DeleteResourcesForAppWithHashFunc
DeleteStatusPostTeardownFunc DeleteStatusPostTeardownFunc
GetJobToDeleteForApplicationFunc GetJobToDeleteForApplicationFunc
GetVersionAndJobIDForHashFunc GetVersionAndJobIDForHashFunc
GetVersionAndHashPostTeardownFunc GetVersionAndHashPostTeardownFunc
CreateClusterFunc CreateClusterFunc
DeleteOldResourcesForAppFunc DeleteOldResourcesForApp
SavepointFunc SavepointFunc
ForceCancelFunc ForceCancelFunc
StartFlinkJobFunc StartFlinkJobFunc
GetSavepointStatusFunc GetSavepointStatusFunc
IsClusterReadyFunc IsClusterReadyFunc
IsServiceReadyFunc IsServiceReadyFunc
GetJobsForApplicationFunc GetJobsForApplicationFunc
GetJobForApplicationFunc GetJobForApplicationFunc
GetCurrentDeploymentsForAppFunc GetCurrentDeploymentsForAppFunc
FindExternalizedCheckpointFunc FindExternalizedCheckpointFunc
FindExternalizedCheckpointForSavepointFunc FindExternalizedCheckpointForSavepointFunc
Events []corev1.Event
CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc
CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc
GetLatestClusterStatusFunc GetLatestClusterStatusFunc
GetLatestJobStatusFunc GetLatestJobStatusFunc
GetLatestJobIDFunc GetLatestJobIDFunc
UpdateLatestJobIDFunc UpdateLatestJobIDFunc
UpdateLatestJobStatusFunc UpdateLatestJobStatusFunc
UpdateLatestClusterStatusFunc UpdateLatestClusterStatusFunc
UpdateLatestVersionAndHashFunc UpdateLatestVersionAndHashFunc
DeleteResourcesForAppWithHashFunc DeleteResourcesForAppWithHashFunc
DeleteStatusPostTeardownFunc DeleteStatusPostTeardownFunc
GetJobToDeleteForApplicationFunc GetJobToDeleteForApplicationFunc
GetVersionAndJobIDForHashFunc GetVersionAndJobIDForHashFunc
GetVersionAndHashPostTeardownFunc GetVersionAndHashPostTeardownFunc
}

func (m *FlinkController) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) {
@@ -151,6 +154,13 @@ func (m *FlinkController) FindExternalizedCheckpoint(ctx context.Context, applic
return "", nil
}

func (m *FlinkController) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
if m.FindExternalizedCheckpointForSavepointFunc != nil {
return m.FindExternalizedCheckpointFunc(ctx, application, hash)
}
return "", nil
}

func (m *FlinkController) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string) {
m.Events = append(m.Events, corev1.Event{
InvolvedObject: corev1.ObjectReference{
29 changes: 27 additions & 2 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
@@ -233,6 +233,7 @@ func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplic

// In this state we create a new cluster, either due to an entirely new FlinkApplication or due to an update.
func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {

// TODO: add up-front validation on the FlinkApplication resource
if rollback, reason := s.shouldRollback(ctx, application); rollback {
// we've failed to make progress; move to deploy failed
@@ -305,16 +306,34 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati

logger.Infof(ctx, "Flink cluster has started successfully")
// TODO: in single mode move to submitting job
if application.Spec.SavepointDisabled && !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) {
if application.Spec.SavepointingDisabled() && !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) {
s.updateApplicationPhase(application, v1beta1.FlinkApplicationCancelling)
} else if application.Spec.SavepointDisabled && v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) {
} else if application.Spec.SavepointingDisabled() && v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) {
// Blue Green deployment and no savepoint required implies, we directly transition to submitting job
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
} else {
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing)
}
return statusChanged, nil
}
func (s *FlinkStateMachine) handleApplicationSavepointingWithCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's scope to abstract out some common code between this method and handleApplicationRecovering()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also minor suggestion: Could we rename this method to not include the term Savepoint(since it's not really using a savepoint)?

checkpointPath, err := s.flinkController.FindExternalizedCheckpointForSavepoint(ctx, application, application.Status.DeployHash)
if err != nil {
return statusUnchanged, err
}

jobID := s.flinkController.GetLatestJobID(ctx, application)
if err := s.flinkController.ForceCancel(ctx, application, application.Status.DeployHash, jobID); err != nil {
return statusUnchanged, err
}

s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob",
fmt.Sprintf("Cancelling job job %s with a final checkpoint", jobID))
application.Status.JobStatus.JobID = ""
application.Status.SavepointPath = checkpointPath
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
return statusChanged, nil
}

func (s *FlinkStateMachine) initializeAppStatusIfEmpty(application *v1beta1.FlinkApplication) {
if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) {
@@ -331,6 +350,7 @@ func (s *FlinkStateMachine) initializeAppStatusIfEmpty(application *v1beta1.Flin
func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
// we've already savepointed (or this is our first deploy), continue on
if application.Status.SavepointPath != "" || application.Status.DeployHash == "" {
logger.Debugf(ctx, "Using SavepointPath: %s", application.Status.SavepointPath)
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
return statusChanged, nil
}
@@ -342,6 +362,11 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering)
return statusChanged, nil
}
// use of checkpoints in the place of savepoints
if application.Spec.UpdateMode == v1beta1.UpdateModeCheckpoint {
return s.handleApplicationSavepointingWithCheckpoint(ctx, application)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would make sense to make the restoration from a checkpoint an explicit state in the state machine?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my gut was that it doesn't warrant it, but I suppose that since there's a max-age and a window where the controller is polling to find the next, fresher checkpoint that the controller might do well to expose that state and have a clearer exit condition. I can prototype it, if you'd like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree that it would be clearer to expose it as a state. Happy to help review the prototype!

}

cancelFlag := getCancelFlag(application)
// we haven't started savepointing yet; do so now
// TODO: figure out the idempotence of this