@@ -61,7 +61,7 @@ static SharedStorageBody.SharedStorageBodyResult sharedStorageBody(List<DataStre
6161
6262 BOperator bOp = new BOperator ();
6363 SingleOutputStreamOperator <Long > afterBOp =
64- data .transform ("b" , TypeInformation .of (Long .class ), bOp );
64+ afterAOp .transform ("b" , TypeInformation .of (Long .class ), bOp );
6565
6666 Map <ItemDescriptor <?>, SharedStorageStreamOperator > ownerMap = new HashMap <>();
6767 ownerMap .put (SUM , aOp );
@@ -89,7 +89,9 @@ public void testSharedStorage() throws Exception {
8989
9090 /** Operator A: add input elements to the shared {@link #SUM}. */
9191 static class AOperator extends AbstractStreamOperator <Long >
92- implements OneInputStreamOperator <Long , Long >, SharedStorageStreamOperator {
92+ implements OneInputStreamOperator <Long , Long >,
93+ SharedStorageStreamOperator ,
94+ BoundedOneInput {
9395
9496 private final String sharedStorageAccessorID ;
9597 private SharedStorageContext sharedStorageContext ;
@@ -115,15 +117,18 @@ public void processElement(StreamRecord<Long> element) throws Exception {
115117 Long currentSum = getter .get (SUM );
116118 setter .set (SUM , currentSum + element .getValue ());
117119 });
118- output .collect (element );
120+ }
121+
122+ @ Override
123+ public void endInput () throws Exception {
124+ // Informs BOperator to get the value from shared {@link #SUM}.
125+ output .collect (new StreamRecord <>(0L ));
119126 }
120127 }
121128
122129 /** Operator B: when input ends, get the value from shared {@link #SUM}. */
123130 static class BOperator extends AbstractStreamOperator <Long >
124- implements OneInputStreamOperator <Long , Long >,
125- SharedStorageStreamOperator ,
126- BoundedOneInput {
131+ implements OneInputStreamOperator <Long , Long >, SharedStorageStreamOperator {
127132
128133 private final String sharedStorageAccessorID ;
129134 private SharedStorageContext sharedStorageContext ;
@@ -143,10 +148,7 @@ public String getSharedStorageAccessorID() {
143148 }
144149
145150 @ Override
146- public void processElement (StreamRecord <Long > element ) throws Exception {}
147-
148- @ Override
149- public void endInput () throws Exception {
151+ public void processElement (StreamRecord <Long > element ) throws Exception {
150152 sharedStorageContext .invoke (
151153 (getter , setter ) -> {
152154 output .collect (new StreamRecord <>(getter .get (SUM )));
0 commit comments