-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37985][conf] Refactor public config options from StreamConfig to JobConfiguration #26713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37985][conf] Refactor public config options from StreamConfig to JobConfiguration #26713
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: After this PR, there are still 2 checkpoint related config options are stored in StreamConfig: CHECKPOINTING_ENABLED and CHECKPOINT_MODE .
The reason for keeping CHECKPOINTING_ENABLED is : Flink doesn't have public config options to indicate CHECKPOINTING_ENABLED, the checkpoint is enable when checkpoint interval > 0 [1].
The reason for keeping CHECKPOINT_MODE is : the checkpoint mode will be AT_LEAST_ONCE when checkpoint is disabled [2] , and it does't update the value in Configuration.
Even so, I still agree to move them outof StreamConfig in the future, we could introduce some static methods in CheckpointingOptions to address these cases, such as:
public static boolean isCheckpointingEnabled(Configuration config) {
return config
.getOptional(CheckpointingOptions.CHECKPOINTING_INTERVAL)
.map(Duration::toMillis)
.orElse(-1L)
> 0;
}
public static CheckpointingMode getCheckpointingMode(Configuration config) {
if (isCheckpointingEnabled(config)) {
return config.get(CHECKPOINTING_CONSISTENCY_MODE);
} else {
// the "at-least-once" input handler is slightly cheaper (in the absence of
// checkpoints), so we use that one if checkpointing is not enabled
return CheckpointingMode.AT_LEAST_ONCE;
}
}
It works similarly to some scheduler config options[3].
[1]
flink/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
Line 100 in df5bdc6
public boolean isCheckpointingEnabled() { |
[2]
flink/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
Line 254 in df5bdc6
public static CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) { |
[3]
flink/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
Line 177 in df5bdc6
public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) { |
…to JobConfiguration
b6e1eb2
to
490aa23
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @1996fanrui, the changes LGTM.
Could you explain why don't you replace the two remaining options though?
I think it would be more consistent to have the same way of handling all checkpointing options.
Even so, I still agree to move them outof StreamConfig in the future, we could introduce some static methods in CheckpointingOptions to address these cases
Or would it be a much bigger change?
b62293e
to
aa4e580
Compare
Thanks @rkhachatryan for the quick review!
It has 2 reasons: bigger change and need to introduce some static methods to read these config options(And ensure no other callers and no new callers to read them from Configuration directly)
Yes, I agree with you. I have updated this PR, and solved all of above problems. Besides of that, I introduced |
…ns to simplify read config options
7763899
to
9e109b7
Compare
9e109b7
to
7699291
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @1996fanrui
I think that's a good idea to add arch tests.
I have a couple of minor remarks, otherwise LGTM.
@@ -290,7 +290,7 @@ private AbstractStreamOperatorTestHarness( | |||
|
|||
Configuration underlyingConfig = env.getTaskConfiguration(); | |||
this.config = new StreamConfig(underlyingConfig); | |||
this.config.setCheckpointingEnabled(true); | |||
// todo ? check the CI result if removing this.config.setCheckpointingEnabled(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be removed now?:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, and removed!
"ENABLE_UNALIGNED", | ||
"CHECKPOINTING_CONSISTENCY_MODE", | ||
"ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems possible to use code constants here (and for class names), right?
So that renaming doesn't make this test a no_op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion!
Class can be easily converted into a String name, for example: CheckpointingOptions.class.getName();
. But Java variable and method names are difficult to convert directly into String names. I used reflection to load variables and methods, and an exception will be thrown if the loading fails. (This arch test will be failed if these variables or methods are removed in the future.)
Following is a demo:
private static Set<String> getProhibitedFieldNames() {
try {
return new HashSet<>(
Arrays.asList(
CheckpointingOptions.class.getField("ENABLE_UNALIGNED").getName(),
CheckpointingOptions.class
.getField("CHECKPOINTING_CONSISTENCY_MODE")
.getName(),
CheckpointingOptions.class
.getField("ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS")
.getName()));
} catch (NoSuchFieldException e) {
// This makes the test class fail to load if a field is ever renamed.
throw new ExceptionInInitializerError(e);
}
}
8989fc8
to
ae5c665
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…o JobConfiguration
ae5c665
to
c00e007
Compare
@flinkbot run azure |
…nt config options are got directly from Configuration.get() or Configuration.getOptional()
c00e007
to
1650bfb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, merging
What is the purpose of the change
Some public config options are stored in StreamConfig, StreamConfig stores some operator related information, such as class serialization result, vertex id, etc. But storing public config options are confusing.
It's better to refactor them from StreamConfig to JobConfiguration. And all callers could read these config options from JobConfigruration instead of StreamConfig.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation