Skip to content

Commit a92eae9

Browse files
committed
Rename shared storage to shared objects and change according to comments
1 parent 0885794 commit a92eae9

21 files changed

+372
-330
lines changed
Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.ml.common.sharedstorage;
19+
package org.apache.flink.ml.common.sharedobjects;
2020

2121
import org.apache.flink.api.common.typeutils.TypeSerializer;
2222
import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -56,12 +56,12 @@
5656
import java.util.Objects;
5757
import java.util.Optional;
5858

59-
/** Base class for the shared storage wrapper operators. */
60-
abstract class AbstractSharedStorageWrapperOperator<T, S extends StreamOperator<T>>
59+
/** Base class for the shared objects wrapper operators. */
60+
abstract class AbstractSharedObjectsWrapperOperator<T, S extends StreamOperator<T>>
6161
implements StreamOperator<T>, IterationListener<T>, CheckpointedStreamOperator {
6262

6363
private static final Logger LOG =
64-
LoggerFactory.getLogger(AbstractSharedStorageWrapperOperator.class);
64+
LoggerFactory.getLogger(AbstractSharedObjectsWrapperOperator.class);
6565

6666
protected final StreamOperatorParameters<T> parameters;
6767

@@ -72,18 +72,18 @@ abstract class AbstractSharedStorageWrapperOperator<T, S extends StreamOperator<
7272
protected final Output<StreamRecord<T>> output;
7373

7474
protected final StreamOperatorFactory<T> operatorFactory;
75-
private final SharedStorageContextImpl context;
75+
private final SharedObjectsContextImpl context;
7676
protected final OperatorMetricGroup metrics;
7777
protected final S wrappedOperator;
7878
protected transient StreamOperatorStateHandler stateHandler;
7979

8080
protected transient InternalTimeServiceManager<?> timeServiceManager;
8181

8282
@SuppressWarnings({"unchecked", "rawtypes"})
83-
AbstractSharedStorageWrapperOperator(
83+
AbstractSharedObjectsWrapperOperator(
8484
StreamOperatorParameters<T> parameters,
8585
StreamOperatorFactory<T> operatorFactory,
86-
SharedStorageContextImpl context) {
86+
SharedObjectsContextImpl context) {
8787
this.parameters = Objects.requireNonNull(parameters);
8888
this.streamConfig = Objects.requireNonNull(parameters.getStreamConfig());
8989
this.containingTask = Objects.requireNonNull(parameters.getContainingTask());
@@ -101,11 +101,11 @@ abstract class AbstractSharedStorageWrapperOperator<T, S extends StreamOperator<
101101
parameters.getOperatorEventDispatcher())
102102
.f0;
103103
Preconditions.checkArgument(
104-
wrappedOperator instanceof SharedStorageStreamOperator,
104+
wrappedOperator instanceof SharedObjectsStreamOperator,
105105
String.format(
106106
"The wrapped operator is not an instance of %s.",
107-
SharedStorageStreamOperator.class.getSimpleName()));
108-
((SharedStorageStreamOperator) wrappedOperator).onSharedStorageContextSet(context);
107+
SharedObjectsStreamOperator.class.getSimpleName()));
108+
((SharedObjectsStreamOperator) wrappedOperator).onSharedObjectsContextSet(context);
109109
}
110110

111111
private OperatorMetricGroup createOperatorMetricGroup(

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/ItemDescriptor.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/ItemDescriptor.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.ml.common.sharedstorage;
19+
package org.apache.flink.ml.common.sharedobjects;
2020

2121
import org.apache.flink.annotation.Experimental;
2222
import org.apache.flink.api.common.typeutils.TypeSerializer;
23+
import org.apache.flink.util.Preconditions;
2324

2425
import java.io.Serializable;
2526

@@ -32,27 +33,29 @@
3233
public class ItemDescriptor<T> implements Serializable {
3334

3435
/** Name of the item. */
35-
public String key;
36+
public final String name;
3637

3738
/** Type serializer. */
38-
public TypeSerializer<T> serializer;
39+
public final TypeSerializer<T> serializer;
3940

4041
/** Initialize value. */
41-
public T initVal;
42+
public final T initVal;
4243

43-
private ItemDescriptor(String key, TypeSerializer<T> serializer, T initVal) {
44-
this.key = key;
44+
private ItemDescriptor(String name, TypeSerializer<T> serializer, T initVal) {
45+
Preconditions.checkNotNull(
46+
initVal, "Cannot use `null` as the initial value of a shared item.");
47+
this.name = name;
4548
this.serializer = serializer;
4649
this.initVal = initVal;
4750
}
4851

49-
public static <T> ItemDescriptor<T> of(String key, TypeSerializer<T> serializer, T initVal) {
50-
return new ItemDescriptor<>(key, serializer, initVal);
52+
public static <T> ItemDescriptor<T> of(String name, TypeSerializer<T> serializer, T initVal) {
53+
return new ItemDescriptor<>(name, serializer, initVal);
5154
}
5255

5356
@Override
5457
public int hashCode() {
55-
return key.hashCode();
58+
return name.hashCode();
5659
}
5760

5861
@Override
@@ -64,12 +67,12 @@ public boolean equals(Object o) {
6467
return false;
6568
}
6669
ItemDescriptor<?> that = (ItemDescriptor<?>) o;
67-
return key.equals(that.key);
70+
return name.equals(that.name);
6871
}
6972

7073
@Override
7174
public String toString() {
7275
return String.format(
73-
"ItemDescriptor{key='%s', serializer=%s, initVal=%s}", key, serializer, initVal);
76+
"ItemDescriptor{name='%s', serializer=%s, initVal=%s}", name, serializer, initVal);
7477
}
7578
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.ml.common.sharedstorage;
19+
package org.apache.flink.ml.common.sharedobjects;
2020

2121
import org.apache.flink.iteration.operator.OperatorUtils;
2222
import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -29,14 +29,14 @@
2929
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
3030

3131
/** Wrapper for {@link OneInputStreamOperator}. */
32-
class OneInputSharedStorageWrapperOperator<IN, OUT>
33-
extends AbstractSharedStorageWrapperOperator<OUT, OneInputStreamOperator<IN, OUT>>
32+
class OneInputSharedObjectsWrapperOperator<IN, OUT>
33+
extends AbstractSharedObjectsWrapperOperator<OUT, OneInputStreamOperator<IN, OUT>>
3434
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
3535

36-
OneInputSharedStorageWrapperOperator(
36+
OneInputSharedObjectsWrapperOperator(
3737
StreamOperatorParameters<OUT> parameters,
3838
StreamOperatorFactory<OUT> operatorFactory,
39-
SharedStorageContextImpl context) {
39+
SharedObjectsContextImpl context) {
4040
super(parameters, operatorFactory, context);
4141
}
4242

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/StorageID.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/PoolID.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.ml.common.sharedstorage;
19+
package org.apache.flink.ml.common.sharedobjects;
2020

2121
import org.apache.flink.util.AbstractID;
2222

23-
/** ID of a shared storage. */
24-
class StorageID extends AbstractID {
23+
/** ID of a pool for shared objects. */
24+
class PoolID extends AbstractID {
2525
private static final long serialVersionUID = 1L;
2626

27-
public StorageID(byte[] bytes) {
27+
public PoolID(byte[] bytes) {
2828
super(bytes);
2929
}
3030

31-
public StorageID() {}
31+
public PoolID() {}
3232
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageBody.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/SharedObjectsBody.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,76 +16,75 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.ml.common.sharedstorage;
19+
package org.apache.flink.ml.common.sharedobjects;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.dag.Transformation;
2223
import org.apache.flink.streaming.api.datastream.DataStream;
2324

2425
import java.io.Serializable;
2526
import java.util.List;
2627
import java.util.Map;
2728

2829
/**
29-
* The builder of the subgraph that will be executed with a common shared storage. Users can only
30+
* The builder of the subgraph that will be executed with a common shared objects. Users can only
3031
* create data streams from {@code inputs}. Users can not refer to data streams outside, and can not
3132
* add sources/sinks.
3233
*
33-
* <p>The shared storage body requires all streams accessing the shared storage, i.e., {@link
34-
* SharedStorageBodyResult#accessors} have same parallelism and can be co-located.
34+
* <p>The shared objects body requires all transformations accessing the shared objects, i.e.,
35+
* {@link SharedObjectsBodyResult#coLocatedTransformations}, to have same parallelism and can be
36+
* co-located.
3537
*/
3638
@Experimental
3739
@FunctionalInterface
38-
public interface SharedStorageBody extends Serializable {
40+
public interface SharedObjectsBody extends Serializable {
3941

4042
/**
41-
* This method creates the subgraph for the shared storage body.
43+
* This method creates the subgraph for the shared objects body.
4244
*
4345
* @param inputs Input data streams.
4446
* @return Result of the subgraph, including output data streams, data streams with access to
45-
* the shared storage, and a mapping from share items to their owners.
47+
* the shared objects, and a mapping from share items to their owners.
4648
*/
47-
SharedStorageBodyResult process(List<DataStream<?>> inputs);
49+
SharedObjectsBodyResult process(List<DataStream<?>> inputs);
4850

4951
/**
50-
* The result of a {@link SharedStorageBody}, including output data streams, data streams with
51-
* access to the shared storage, and a mapping from descriptors of share items to their owners.
52+
* The result of a {@link SharedObjectsBody}, including output data streams, data streams with
53+
* access to the shared objects, and a mapping from descriptors of share items to their owners.
5254
*/
5355
@Experimental
54-
class SharedStorageBodyResult {
56+
class SharedObjectsBodyResult {
5557
/** A list of output streams. */
5658
private final List<DataStream<?>> outputs;
5759

58-
/**
59-
* A list of data streams which access to the shared storage. All data streams in the list
60-
* should implement {@link SharedStorageStreamOperator}.
61-
*/
62-
private final List<DataStream<?>> accessors;
60+
/** A list of {@link Transformation}s that should be co-located. */
61+
private final List<Transformation<?>> coLocatedTransformations;
6362

6463
/**
6564
* A mapping from descriptors of shared items to their owners. The owner is specified by
66-
* {@link SharedStorageStreamOperator#getSharedStorageAccessorID()}, which must be kept
67-
* unchanged for an instance of {@link SharedStorageStreamOperator}.
65+
* {@link SharedObjectsStreamOperator#getSharedObjectsAccessorID()}, which must be kept
66+
* unchanged for an instance of {@link SharedObjectsStreamOperator}.
6867
*/
69-
private final Map<ItemDescriptor<?>, SharedStorageStreamOperator> ownerMap;
68+
private final Map<ItemDescriptor<?>, SharedObjectsStreamOperator> ownerMap;
7069

71-
public SharedStorageBodyResult(
70+
public SharedObjectsBodyResult(
7271
List<DataStream<?>> outputs,
73-
List<DataStream<?>> accessors,
74-
Map<ItemDescriptor<?>, SharedStorageStreamOperator> ownerMap) {
72+
List<Transformation<?>> coLocatedTransformations,
73+
Map<ItemDescriptor<?>, SharedObjectsStreamOperator> ownerMap) {
7574
this.outputs = outputs;
76-
this.accessors = accessors;
75+
this.coLocatedTransformations = coLocatedTransformations;
7776
this.ownerMap = ownerMap;
7877
}
7978

8079
public List<DataStream<?>> getOutputs() {
8180
return outputs;
8281
}
8382

84-
public List<DataStream<?>> getAccessors() {
85-
return accessors;
83+
public List<Transformation<?>> getCoLocatedTransformations() {
84+
return coLocatedTransformations;
8685
}
8786

88-
public Map<ItemDescriptor<?>, SharedStorageStreamOperator> getOwnerMap() {
87+
public Map<ItemDescriptor<?>, SharedObjectsStreamOperator> getOwnerMap() {
8988
return ownerMap;
9089
}
9190
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageContext.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/SharedObjectsContext.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.ml.common.sharedstorage;
19+
package org.apache.flink.ml.common.sharedobjects;
2020

2121
import org.apache.flink.annotation.Experimental;
2222
import org.apache.flink.util.function.BiConsumerWithException;
2323

2424
/**
25-
* Context for shared storage. Every operator implementing {@link SharedStorageStreamOperator} will
26-
* have an instance of this context set by {@link
27-
* SharedStorageStreamOperator#onSharedStorageContextSet} in runtime. User defined logic can be
25+
* Context for shared objects. Every operator implementing {@link SharedObjectsStreamOperator} will
26+
* get an instance of this context set by {@link
27+
* SharedObjectsStreamOperator#onSharedObjectsContextSet} in runtime. User-defined logic can be
2828
* invoked through {@link #invoke} with the access to shared items.
2929
*/
3030
@Experimental
31-
public interface SharedStorageContext {
31+
public interface SharedObjectsContext {
3232

3333
/**
34-
* Invoke user defined function with provided getters/setters of the shared storage.
34+
* Invoke user defined function with provided getters/setters of the shared objects.
3535
*
3636
* @param func User defined function where share items can be accessed through getters/setters.
3737
* @throws Exception Possible exception.

0 commit comments

Comments
 (0)