File tree 2 files changed +15
-4
lines changed
main/java/org/apache/flink/api/dag
test/java/org/apache/flink/api/dag
2 files changed +15
-4
lines changed Original file line number Diff line number Diff line change 33
33
import javax .annotation .Nullable ;
34
34
35
35
import java .util .Collections ;
36
+ import java .util .EnumMap ;
36
37
import java .util .HashMap ;
37
38
import java .util .HashSet ;
38
39
import java .util .List ;
42
43
import java .util .concurrent .atomic .AtomicInteger ;
43
44
44
45
import static org .apache .flink .util .Preconditions .checkArgument ;
45
- import static org .apache .flink .util .Preconditions .checkNotNull ;
46
46
47
47
/**
48
48
* A {@code Transformation} represents the operation that creates a DataStream. Every DataStream has
@@ -160,7 +160,7 @@ public static int getNewNodeId() {
160
160
* will be shared by all the declaring transformations within a slot according to this weight.
161
161
*/
162
162
private final Map <ManagedMemoryUseCase , Integer > managedMemoryOperatorScopeUseCaseWeights =
163
- new HashMap <>();
163
+ new EnumMap <>(ManagedMemoryUseCase . class );
164
164
165
165
/**
166
166
* This map is a cache that stores transitive predecessors and used in {@code
@@ -306,8 +306,8 @@ public void setMaxParallelism(int maxParallelism) {
306
306
*/
307
307
public void setResources (ResourceSpec minResources , ResourceSpec preferredResources ) {
308
308
OperatorValidationUtils .validateMinAndPreferredResources (minResources , preferredResources );
309
- this .minResources = checkNotNull ( minResources ) ;
310
- this .preferredResources = checkNotNull ( preferredResources ) ;
309
+ this .minResources = minResources ;
310
+ this .preferredResources = preferredResources ;
311
311
}
312
312
313
313
/**
Original file line number Diff line number Diff line change 18
18
19
19
package org .apache .flink .api .dag ;
20
20
21
+ import org .apache .flink .api .common .operators .ResourceSpec ;
21
22
import org .apache .flink .api .common .typeinfo .TypeInformation ;
22
23
import org .apache .flink .core .memory .ManagedMemoryUseCase ;
23
24
import org .apache .flink .core .testutils .CheckedThread ;
@@ -134,6 +135,16 @@ void testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() {
134
135
.isInstanceOf (IllegalArgumentException .class );
135
136
}
136
137
138
+ @ Test
139
+ void testSetResourcesUseCaseFailNullResources () {
140
+ ResourceSpec resourceSpec = ResourceSpec .newBuilder (1.0 , 100 ).build ();
141
+
142
+ assertThatThrownBy (() -> transformation .setResources (null , resourceSpec ))
143
+ .isInstanceOf (NullPointerException .class );
144
+ assertThatThrownBy (() -> transformation .setResources (resourceSpec , null ))
145
+ .isInstanceOf (NullPointerException .class );
146
+ }
147
+
137
148
/** A test implementation of {@link Transformation}. */
138
149
private static class TestTransformation <T > extends Transformation <T > {
139
150
You can’t perform that action at this time.
0 commit comments