Skip to content

Commit 39b28a9

Browse files
committed
Refactor and refinement.
1 parent c1082b6 commit 39b28a9

23 files changed

+493
-431
lines changed

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsOneInputStreamOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222

2323
import java.util.List;
2424

25-
/** The base class for all {@link OneInputStreamOperator} if shared objects are required. */
25+
/** The base class for {@link OneInputStreamOperator}s where shared objects are accessed. */
2626
public abstract class AbstractSharedObjectsOneInputStreamOperator<IN, OUT>
2727
extends AbstractSharedObjectsStreamOperator<OUT>
2828
implements OneInputStreamOperator<IN, OUT> {
2929

30-
public abstract List<ItemReadRequest<?>> registerReadsInProcessElement();
30+
public abstract List<ReadRequest<?>> readRequestsInProcessElement();
3131
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsStreamOperator.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.UUID;
2424

2525
/**
26-
* A base class of stream operators which support shared objects.
26+
* A base class of stream operators where shared objects are required.
2727
*
2828
* <p>Official subclasses, i.e., {@link AbstractSharedObjectsOneInputStreamOperator} and {@link
2929
* AbstractSharedObjectsTwoInputStreamOperator}, are strongly recommended.
@@ -32,31 +32,25 @@
3232
*/
3333
public abstract class AbstractSharedObjectsStreamOperator<OUT> extends AbstractStreamOperator<OUT> {
3434

35-
private final String sharedObjectsAccessorID;
36-
private transient SharedObjectsContext sharedObjectsContext;
35+
/**
36+
* A unique identifier for the instance, which is kept unchanged between client side and
37+
* runtime.
38+
*/
39+
private final String accessorID;
3740

38-
protected AbstractSharedObjectsStreamOperator() {
39-
super();
40-
sharedObjectsAccessorID = getClass().getSimpleName() + "-" + UUID.randomUUID();
41-
}
42-
43-
protected void onSharedObjectsContextSet(SharedObjectsContext context) {
44-
sharedObjectsContext = context;
45-
}
41+
/** The context for shared objects reads/writes. */
42+
protected transient SharedObjectsContext context;
4643

47-
public String getSharedObjectsAccessorID() {
48-
return sharedObjectsAccessorID;
49-
}
50-
51-
public <T> T getSharedItem(ItemReadRequest<T> request) {
52-
return sharedObjectsContext.get(request);
44+
AbstractSharedObjectsStreamOperator() {
45+
super();
46+
accessorID = getClass().getSimpleName() + "-" + UUID.randomUUID();
5347
}
5448

55-
public <T> void setSharedItem(ItemDescriptor<T> key, T value) {
56-
sharedObjectsContext.set(key, value);
49+
void onSharedObjectsContextSet(SharedObjectsContext context) {
50+
this.context = context;
5751
}
5852

59-
public <T> void renewSharedItem(ItemDescriptor<T> key) {
60-
sharedObjectsContext.renew(key);
53+
String getAccessorID() {
54+
return accessorID;
6155
}
6256
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsTwoInputStreamOperator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222

2323
import java.util.List;
2424

25-
/** The base class for all {@link TwoInputStreamOperator} if shared objects are required. */
25+
/** The base class for {@link TwoInputStreamOperator}s where shared objects are accessed. */
2626
public abstract class AbstractSharedObjectsTwoInputStreamOperator<IN1, IN2, OUT>
2727
extends AbstractSharedObjectsStreamOperator<OUT>
2828
implements TwoInputStreamOperator<IN1, IN2, OUT> {
2929

30-
public abstract List<ItemReadRequest<?>> registerReadsInProcessElement1();
30+
public abstract List<ReadRequest<?>> readRequestsInProcessElement1();
3131

32-
public abstract List<ItemReadRequest<?>> registerReadsInProcessElement2();
32+
public abstract List<ReadRequest<?>> readRequestsInProcessElement2();
3333
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsWrapperOperator.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ abstract class AbstractSharedObjectsWrapperOperator<
9191
private final int numInputs;
9292
private final TypeSerializer<?>[] inTypeSerializers;
9393
private final ListStateWithCache<CacheElement<?>>[] cachedElements;
94-
private final Queue<ItemReadRequest<?>>[] itemReadRequests;
94+
private final Queue<ReadRequest<?>>[] readRequests;
9595
private final boolean[] hasCachedElements;
9696

9797
protected transient StreamOperatorStateHandler stateHandler;
@@ -131,11 +131,11 @@ abstract class AbstractSharedObjectsWrapperOperator<
131131
numInputs = numNetworkInputs;
132132

133133
inTypeSerializers = new TypeSerializer[numInputs];
134-
itemReadRequests = new Queue[numInputs];
134+
readRequests = new Queue[numInputs];
135135
for (int i = 0; i < numInputs; i++) {
136136
inTypeSerializers[i] =
137137
streamConfig.getTypeSerializerIn(i, containingTask.getUserCodeClassLoader());
138-
itemReadRequests[i] = new ArrayDeque<>(getInputItemReadRequests(i));
138+
readRequests[i] = new ArrayDeque<>(getInputReadRequests(i));
139139
}
140140
cachedElements = new ListStateWithCache[numInputs];
141141
hasCachedElements = new boolean[numInputs];
@@ -162,18 +162,18 @@ private OperatorMetricGroup createOperatorMetricGroup(
162162
}
163163

164164
/**
165-
* Checks if the item read requests are satisfied for the input.
165+
* Checks if the read requests are satisfied for the input.
166166
*
167167
* @param inputId The input id, starting from 0.
168168
* @param wait Whether to wait until all requests satisfied, or not.
169169
* @return If all requests of this input are satisfied.
170170
*/
171-
private boolean checkSharedItemReadsReady(int inputId, boolean wait) {
172-
Queue<ItemReadRequest<?>> requests = itemReadRequests[inputId];
171+
private boolean checkReadRequestsReady(int inputId, boolean wait) {
172+
Queue<ReadRequest<?>> requests = readRequests[inputId];
173173
while (!requests.isEmpty()) {
174-
ItemReadRequest<?> request = requests.poll();
174+
ReadRequest<?> request = requests.poll();
175175
try {
176-
if (null == context.get(request, wait)) {
176+
if (null == context.read(request, wait)) {
177177
requests.add(request);
178178
return false;
179179
}
@@ -185,12 +185,12 @@ private boolean checkSharedItemReadsReady(int inputId, boolean wait) {
185185
}
186186

187187
/**
188-
* Gets {@link ItemReadRequest}s required for processing elements in the input.
188+
* Gets {@link ReadRequest}s required for processing elements in the input.
189189
*
190190
* @param inputId The input id, starting from 0.
191-
* @return The {@link ItemReadRequest}s required for processing elements.
191+
* @return The {@link ReadRequest}s required for processing elements.
192192
*/
193-
protected abstract List<ItemReadRequest<?>> getInputItemReadRequests(int inputId);
193+
protected abstract List<ReadRequest<?>> getInputReadRequests(int inputId);
194194

195195
/**
196196
* Extracts common processing logic in subclasses' processing elements.
@@ -213,7 +213,7 @@ protected void processElementX(
213213
ThrowingConsumer<Watermark, Exception> watermarkConsumer,
214214
ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
215215
throws Exception {
216-
if (checkSharedItemReadsReady(inputId, false)) {
216+
if (checkReadRequestsReady(inputId, false)) {
217217
if (hasCachedElements[inputId]) {
218218
processCachedElements(
219219
inputId, elementConsumer, watermarkConsumer, keyContextSetter);
@@ -248,7 +248,7 @@ protected void processWatermarkX(
248248
ThrowingConsumer<Watermark, Exception> watermarkConsumer,
249249
ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
250250
throws Exception {
251-
if (checkSharedItemReadsReady(inputId, false)) {
251+
if (checkReadRequestsReady(inputId, false)) {
252252
if (hasCachedElements[inputId]) {
253253
processCachedElements(
254254
inputId, elementConsumer, watermarkConsumer, keyContextSetter);
@@ -281,7 +281,7 @@ protected void endInputX(
281281
ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
282282
throws Exception {
283283
if (hasCachedElements[inputId]) {
284-
checkSharedItemReadsReady(inputId, true);
284+
checkReadRequestsReady(inputId, true);
285285
processCachedElements(inputId, elementConsumer, watermarkConsumer, keyContextSetter);
286286
hasCachedElements[inputId] = false;
287287
}
@@ -322,8 +322,8 @@ private void processCachedElements(
322322
}
323323
}
324324
cachedElements[inputId].clear();
325-
Preconditions.checkState(itemReadRequests[inputId].isEmpty());
326-
itemReadRequests[inputId].addAll(getInputItemReadRequests(inputId));
325+
Preconditions.checkState(readRequests[inputId].isEmpty());
326+
readRequests[inputId].addAll(getInputReadRequests(inputId));
327327
}
328328

329329
@Override
@@ -489,7 +489,7 @@ public void onEpochWatermarkIncremented(
489489
for (int i = 0; i < numInputs; i += 1) {
490490
processCachedElementsBeforeEpochIncremented(i);
491491
}
492-
this.context.increaseEpoch(epochWatermark);
492+
this.context.incStep(epochWatermark);
493493
if (wrappedOperator instanceof IterationListener) {
494494
//noinspection unchecked
495495
((IterationListener<T>) wrappedOperator)
@@ -499,7 +499,7 @@ public void onEpochWatermarkIncremented(
499499

500500
@Override
501501
public void onIterationTerminated(Context context, Collector<T> collector) throws Exception {
502-
this.context.increaseEpoch();
502+
this.context.incStep();
503503
if (wrappedOperator instanceof IterationListener) {
504504
//noinspection unchecked
505505
((IterationListener<T>) wrappedOperator).onIterationTerminated(context, collector);

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/ItemDescriptor.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/Descriptor.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@
2727
import java.io.Serializable;
2828

2929
/**
30-
* Descriptor for a shared item.
30+
* Descriptor for a shared object.
3131
*
32-
* @param <T> The type of the shared item.
32+
* <p>A shared object can have a non-null initial value, or have no initial values. If a non-null
33+
* initial value provided, it is set with an initial write-step (See {@link ReadRequest}).
34+
*
35+
* @param <T> The type of the shared object.
3336
*/
3437
@Experimental
35-
public class ItemDescriptor<T> implements Serializable {
38+
public class Descriptor<T> implements Serializable {
3639

37-
/** Name of the item. */
40+
/** Name of the shared object. */
3841
public final String name;
3942

4043
/** Type serializer. */
@@ -43,32 +46,50 @@ public class ItemDescriptor<T> implements Serializable {
4346
/** Initialize value. */
4447
public final @Nullable T initVal;
4548

46-
private ItemDescriptor(String name, TypeSerializer<T> serializer, T initVal) {
49+
private Descriptor(String name, TypeSerializer<T> serializer, T initVal) {
4750
this.name = name;
4851
this.serializer = serializer;
4952
this.initVal = initVal;
5053
}
5154

52-
public static <T> ItemDescriptor<T> of(String name, TypeSerializer<T> serializer, T initVal) {
55+
public static <T> Descriptor<T> of(String name, TypeSerializer<T> serializer, T initVal) {
5356
Preconditions.checkNotNull(
54-
initVal, "Cannot use `null` as the initial value of a shared item.");
55-
return new ItemDescriptor<>(name, serializer, initVal);
57+
initVal, "Cannot use `null` as the initial value of a shared object.");
58+
return new Descriptor<>(name, serializer, initVal);
5659
}
5760

58-
public static <T> ItemDescriptor<T> of(String name, TypeSerializer<T> serializer) {
59-
return new ItemDescriptor<>(name, serializer, null);
61+
public static <T> Descriptor<T> of(String name, TypeSerializer<T> serializer) {
62+
return new Descriptor<>(name, serializer, null);
6063
}
6164

62-
public ItemReadRequest<T> sameEpoch() {
63-
return new ItemReadRequest<>(this, ItemReadRequest.OFFSET.SAME);
65+
/**
66+
* Creates a read request which always reads this shared object with same read-step as the
67+
* operator step.
68+
*
69+
* @return A read request.
70+
*/
71+
public ReadRequest<T> sameStep() {
72+
return new ReadRequest<>(this, ReadRequest.OFFSET.SAME);
6473
}
6574

66-
public ItemReadRequest<T> prevEpoch() {
67-
return new ItemReadRequest<>(this, ItemReadRequest.OFFSET.PREV);
75+
/**
76+
* Creates a read request which always reads this shared object with the read-step be the
77+
* previous item of the operator step.
78+
*
79+
* @return A read request.
80+
*/
81+
public ReadRequest<T> prevStep() {
82+
return new ReadRequest<>(this, ReadRequest.OFFSET.PREV);
6883
}
6984

70-
public ItemReadRequest<T> nextEpoch() {
71-
return new ItemReadRequest<>(this, ItemReadRequest.OFFSET.NEXT);
85+
/**
86+
* Creates a read request which always reads this shared object with the read-step be the next
87+
* item of the operator step.
88+
*
89+
* @return A read request.
90+
*/
91+
public ReadRequest<T> nextStep() {
92+
return new ReadRequest<>(this, ReadRequest.OFFSET.NEXT);
7293
}
7394

7495
@Override
@@ -84,13 +105,13 @@ public boolean equals(Object o) {
84105
if (o == null || getClass() != o.getClass()) {
85106
return false;
86107
}
87-
ItemDescriptor<?> that = (ItemDescriptor<?>) o;
108+
Descriptor<?> that = (Descriptor<?>) o;
88109
return name.equals(that.name);
89110
}
90111

91112
@Override
92113
public String toString() {
93114
return String.format(
94-
"ItemDescriptor{name='%s', serializer=%s, initVal=%s}", name, serializer, initVal);
115+
"Descriptor{name='%s', serializer=%s, initVal=%s}", name, serializer, initVal);
95116
}
96117
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/ItemReadRequest.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/OneInputSharedObjectsWrapperOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ class OneInputSharedObjectsWrapperOperator<IN, OUT>
4545
}
4646

4747
@Override
48-
protected List<ItemReadRequest<?>> getInputItemReadRequests(int inputId) {
48+
protected List<ReadRequest<?>> getInputReadRequests(int inputId) {
4949
Preconditions.checkArgument(0 == inputId);
50-
return wrappedOperator.registerReadsInProcessElement();
50+
return wrappedOperator.readRequestsInProcessElement();
5151
}
5252

5353
@Override

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/PoolID.java

Lines changed: 0 additions & 32 deletions
This file was deleted.

0 commit comments

Comments
 (0)