@@ -232,21 +232,17 @@ void snapshotState(
232
232
try {
233
233
if (timeServiceManager .isPresent ()) {
234
234
boolean requiresLegacyRawKeyedStateSnapshots ;
235
- final InternalTimeServiceManager <?> manager ;
236
235
if (useAsyncState ) {
237
236
checkState (
238
237
asyncKeyedStateBackend != null ,
239
- "keyedStateBackend should be available with timeServiceManager" );
240
- manager = timeServiceManager .get ();
238
+ "asyncKeyedStateBackend should be available with timeServiceManager" );
241
239
requiresLegacyRawKeyedStateSnapshots =
242
240
asyncKeyedStateBackend .requiresLegacySynchronousTimerSnapshots (
243
241
checkpointOptions .getCheckpointType ());
244
242
} else {
245
243
checkState (
246
244
keyedStateBackend != null ,
247
245
"keyedStateBackend should be available with timeServiceManager" );
248
- manager = timeServiceManager .get ();
249
-
250
246
requiresLegacyRawKeyedStateSnapshots =
251
247
keyedStateBackend instanceof AbstractKeyedStateBackend
252
248
&& ((AbstractKeyedStateBackend <?>) keyedStateBackend )
@@ -257,6 +253,7 @@ void snapshotState(
257
253
checkState (
258
254
!isUsingCustomRawKeyedState ,
259
255
"Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write." );
256
+ final InternalTimeServiceManager <?> manager = timeServiceManager .get ();
260
257
manager .snapshotToRawKeyedState (
261
258
snapshotContext .getRawKeyedOperatorStateOutput (), operatorName );
262
259
}
0 commit comments