-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update with external checkpoints #199
base: master
Are you sure you want to change the base?
Update with external checkpoints #199
Conversation
pkg/apis/app/v1beta1/types.go
Outdated
@@ -58,6 +59,7 @@ type FlinkApplicationSpec struct { | |||
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` | |||
ForceRollback bool `json:"forceRollback"` | |||
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"` | |||
MaxCheckpointDeployAgeSeconds *int32 `json:"maxCheckpointDeployAgeSeconds,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is MaxCheckpointDeployAgeSeconds
functionally different from the MaxCheckpointRestoreAgeSeconds
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not functionally, no. my thinking was that these checkpoint age values could have very different acceptability thresholds since the MaxCheckpointDeployAgeSeconds
would be looking for an ideal, possibly short window. I may have read the code wrong, but my thinking was by failing this age check, the operator would effectively poll the JM API looking for the next checkpoint to run and complete (we have ours checkpointing frequently.)
I may have read the state machine handler incorrectly though.
Hi @davidbirdsong ! Thank you for creating this PR. I have one overall comment/suggestion on the approach. Including the functionality you have in this PR, we have 3 ways to perform an update today: WithSavepoint, Without Savepoint, With Checkpoint. I'm wondering if we can consolidate these options into one field (maybe something called We will still have to support the existing flags for backward compatibility. Thoughts? |
I figured additive was much easier WRT to backwards compatibility since savepoints are part of job cancellation while checkpoints are not and so the supporting behavior savepoints and checkpoints would be very different. I'm open to unifying the functionality/implementation, but I'd want some guidance/collaboration since expanding features into existing whilst maintaining backwards compatibility is trickier. I'd rather not guess at what's palatable to the core devs. |
savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope), | ||
savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope), | ||
getCheckpointsConfigSuccessCounter: labeled.NewCounter("get_checkpoints_config_success", "Get checkpoint config request succeeded", flinkJmClientScope), | ||
getCheckpointsConfigFailureCounter: labeled.NewCounter("get_checkpoints_config_failed", "Get checkpoint config request failed", flinkJmClientScope), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love go fmt
, but it can also be disruptive causing PR's to bloat changes and hides the actual diff.
I could add some line breaks to separate out the new, longer struct member and minimize the line change count on things like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Not a problem, ok to leave as is :)
Sorry for the delay here @davidbirdsong ! Thanks for addressing the review comments. I am leaning on the approach where we introduce an |
Can you advise on how to handle the coupling of needing both of the following to be true:
without these, any checkpoint-based update will fail. For this PR, I kept it simple by returning from |
UpdateMode will allow for 3 differen actions taken before old application is cancelled: - savepoint - checkpoint - no state saved (taking over existing SavepointDisabled flag)
Hey @davidbirdsong ! Thanks for the updates. I will review this PR in the next couple of days :) |
savepointpath can be supplied or found dynamically, helps with debugpath to observe which one is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @davidbirdsong ! I took a stab at an initial review and left a few comments. Once we work through some design discussions on the PR, it would also be nice to have an integration test that tests this feature.
// Blue Green deployment and no savepoint required implies, we directly transition to submitting job | ||
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) | ||
} else { | ||
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing) | ||
} | ||
return statusChanged, nil | ||
} | ||
func (s *FlinkStateMachine) handleApplicationSavepointingWithCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's scope to abstract out some common code between this method and handleApplicationRecovering()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also minor suggestion: Could we rename this method to not include the term Savepoint
(since it's not really using a savepoint)?
savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope), | ||
savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope), | ||
getCheckpointsConfigSuccessCounter: labeled.NewCounter("get_checkpoints_config_success", "Get checkpoint config request succeeded", flinkJmClientScope), | ||
getCheckpointsConfigFailureCounter: labeled.NewCounter("get_checkpoints_config_failed", "Get checkpoint config request failed", flinkJmClientScope), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Not a problem, ok to leave as is :)
} | ||
|
||
func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { | ||
checkpointConfig, err := f.flinkClient.GetCheckpointConfig(ctx, f.getURLFromApp(application, hash), application.Status.JobStatus.JobID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there opportunity to combine FindExternalizedCheckpoint
and FindExternalizedCheckpointForSavepoint
into a single method? I believe that the checkpoint configuration check (to ensure it's RETAIN_ON_CANCELLATION) is applicable to both cases.
@@ -342,6 +362,11 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a | |||
s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering) | |||
return statusChanged, nil | |||
} | |||
// use of checkpoints in the place of savepoints | |||
if application.Spec.UpdateMode == v1beta1.UpdateModeCheckpoint { | |||
return s.handleApplicationSavepointingWithCheckpoint(ctx, application) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would make sense to make the restoration from a checkpoint an explicit state in the state machine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, my gut was that it doesn't warrant it, but I suppose that since there's a max-age and a window where the controller is polling to find the next, fresher checkpoint that the controller might do well to expose that state and have a clearer exit condition. I can prototype it, if you'd like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree that it would be clearer to expose it as a state. Happy to help review the prototype!
Another thing that occurred to me: how a non-empty My thinking is that when |
That's a very good point! I don't believe we'd want the In the first deploy of a job, we use the provided savepoint (if any) while submitting the job. During an update, however the field is not used. So I think we should be okay with keeping these states/transitions isolated. Let me know what you think. Another thought that came to mind is that, since v0.5 we have a |
A draft to implement new functionality proposed in #197.
Adding in values to the CRD to support conveying that savepoint during upgrade can be performed with a checkpoint instead.
Checkpoints are running constantly in the background and so a 'recent enough' checkpoint must exist and that recency configurable.