Skip to content

Commit f38d8ca

Browse files
Zakellymasteryhx
authored andcommitted
[FLINK-34978][State] Introduce Asynchronous State APIs (apache#24595)
1 parent d271495 commit f38d8ca

File tree

9 files changed

+594
-0
lines changed

9 files changed

+594
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.state.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Base interface for partitioned state that supports adding elements and inspecting the current
25+
* state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
26+
*
27+
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the
28+
* system as part of the distributed snapshots.
29+
*
30+
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
31+
* automatically supplied by the system, so the function always sees the value mapped to the key of
32+
* the current element. That way, the system can handle stream and state partitioning consistently
33+
* together.
34+
*
35+
* @param <IN> Type of the value that can be added to the state.
36+
* @param <OUT> Type of the value that can be retrieved from the state.
37+
*/
38+
@Experimental
39+
public interface AppendingState<IN, OUT> extends State {
40+
41+
/**
42+
* Returns the current value for the state. When the state is not partitioned the returned value
43+
* is the same for all inputs in a given operator instance. If state partitioning is applied,
44+
* the value returned depends on the current operator input, as the operator maintains an
45+
* independent state for each partition.
46+
*
47+
* <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method should return {@code
48+
* null} wrapped by a StateFuture.
49+
*
50+
* @return The operator state value corresponding to the current input or {@code null} wrapped
51+
* by a {@link StateFuture} if the state is empty.
52+
*/
53+
StateFuture<OUT> asyncGet();
54+
55+
/**
56+
* Updates the operator state accessible by {@link #asyncGet()} by adding the given value to the
57+
* list of values. The next time {@link #asyncGet()} is called (for the same state partition)
58+
* the returned state will represent the updated list.
59+
*
60+
* <p>null value is not allowed to be passed in.
61+
*
62+
* @param value The new value for the state.
63+
*/
64+
StateFuture<Void> asyncAdd(IN value);
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.state.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
import java.util.List;
24+
25+
/**
26+
* {@link State} interface for partitioned list state in Operations. The state is accessed and
27+
* modified by user functions, and checkpointed consistently by the system as part of the
28+
* distributed snapshots.
29+
*
30+
* <p>The state can be a keyed list state or an operator list state.
31+
*
32+
* <p>When it is a keyed list state, it is accessed by functions applied on a {@code KeyedStream}.
33+
* The key is automatically supplied by the system, so the function always sees the value mapped to
34+
* the key of the current element. That way, the system can handle stream and state partitioning
35+
* consistently together.
36+
*
37+
* <p>When it is an operator list state, the list is a collection of state items that are
38+
* independent from each other and eligible for redistribution across operator instances in case of
39+
* changed operator parallelism.
40+
*
41+
* @param <T> Type of values that this list state keeps.
42+
*/
43+
@Experimental
44+
public interface ListState<T> extends MergingState<T, StateIterator<T>> {
45+
46+
/**
47+
* Updates the operator state accessible by {@link #asyncGet()} by updating existing values to
48+
* the given list of values. The next time {@link #asyncGet()} is called (for the same state
49+
* partition) the returned state will represent the updated list.
50+
*
51+
* <p>If an empty list is passed in, the state value will be null.
52+
*
53+
* <p>Null value passed in or any null value in list is not allowed.
54+
*
55+
* @param values The new values for the state.
56+
*/
57+
StateFuture<Void> asyncUpdate(List<T> values);
58+
59+
/**
60+
* Updates the operator state accessible by {@link #asyncGet()} by adding the given values to
61+
* existing list of values. The next time {@link #asyncGet()} is called (for the same state
62+
* partition) the returned state will represent the updated list.
63+
*
64+
* <p>If an empty list is passed in, the state value remains unchanged.
65+
*
66+
* <p>Null value passed in or any null value in list is not allowed.
67+
*
68+
* @param values The new values to be added to the state.
69+
*/
70+
StateFuture<Void> asyncAddAll(List<T> values);
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.state.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* {@link State} interface for partitioned key-value state. The key-value pair can be added, updated
27+
* and retrieved.
28+
*
29+
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the
30+
* system as part of the distributed snapshots.
31+
*
32+
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
33+
* automatically supplied by the system, so the function always sees the value mapped to the key of
34+
* the current element. That way, the system can handle stream and state partitioning consistently
35+
* together.
36+
*
37+
* @param <UK> Type of the keys in the state.
38+
* @param <UV> Type of the values in the state.
39+
*/
40+
@Experimental
41+
public interface MapState<UK, UV> extends State {
42+
43+
/**
44+
* Returns the current value associated with the given key asynchronously. When the state is not
45+
* partitioned the returned value is the same for all inputs in a given operator instance. If
46+
* state partitioning is applied, the value returned depends on the current operator input, as
47+
* the operator maintains an independent state for each partition.
48+
*
49+
* @return The {@link StateFuture} that will return value corresponding to the current input.
50+
* When no corresponding value for this key, the future will return {@code null}.
51+
*/
52+
StateFuture<UV> asyncGet(UK key);
53+
54+
/**
55+
* Update the current value associated with the given key asynchronously. When the state is not
56+
* partitioned the value is updated for all inputs in a given operator instance. If state
57+
* partitioning is applied, the updated value depends on the current operator input, as the
58+
* operator maintains an independent state for each partition. When a {@code null} value is
59+
* provided, the state for the given key will be removed.
60+
*
61+
* @param key The key that will be updated.
62+
* @param value The new value for the key.
63+
* @return The {@link StateFuture} that will trigger the callback when update finishes.
64+
*/
65+
StateFuture<Void> asyncPut(UK key, UV value);
66+
67+
/**
68+
* Update all of the mappings from the given map into the state asynchronously. When the state
69+
* is not partitioned the value is updated for all inputs in a given operator instance. If state
70+
* partitioning is applied, the updated mapping depends on the current operator input, as the
71+
* operator maintains an independent state for each partition. When a {@code null} value is
72+
* provided within the map, the state for the corresponding key will be removed.
73+
*
74+
* <p>If an empty map is passed in, the state value remains unchanged.
75+
*
76+
* <p>Null map pointer is not allowed.
77+
*
78+
* @param map The mappings to be stored in this state.
79+
* @return The {@link StateFuture} that will trigger the callback when update finishes.
80+
*/
81+
StateFuture<Void> asyncPutAll(Map<UK, UV> map);
82+
83+
/**
84+
* Delete the mapping of the given key from the state asynchronously. When the state is not
85+
* partitioned the deleted value is the same for all inputs in a given operator instance. If
86+
* state partitioning is applied, the value deleted depends on the current operator input, as
87+
* the operator maintains an independent state for each partition.
88+
*
89+
* @param key The key of the mapping.
90+
* @return The {@link StateFuture} that will trigger the callback when update finishes.
91+
*/
92+
StateFuture<Void> asyncRemove(UK key);
93+
94+
/**
95+
* Returns whether there exists the given mapping asynchronously. When the state is not
96+
* partitioned the returned value is the same for all inputs in a given operator instance. If
97+
* state partitioning is applied, the value returned depends on the current operator input, as
98+
* the operator maintains an independent state for each partition.
99+
*
100+
* @param key The key of the mapping.
101+
* @return The {@link StateFuture} that will return true if there exists a mapping whose key
102+
* equals to the given key.
103+
*/
104+
StateFuture<Boolean> asyncContains(UK key);
105+
106+
/**
107+
* Returns the current iterator for all the mappings of this state asynchronously. When the
108+
* state is not partitioned the returned iterator is the same for all inputs in a given operator
109+
* instance. If state partitioning is applied, the iterator returned depends on the current
110+
* operator input, as the operator maintains an independent state for each partition.
111+
*
112+
* @return The {@link StateFuture} that will return mapping iterator corresponding to the
113+
* current input.
114+
*/
115+
StateFuture<StateIterator<Map.Entry<UK, UV>>> asyncEntries();
116+
117+
/**
118+
* Returns the current iterator for all the keys of this state asynchronously. When the state is
119+
* not partitioned the returned iterator is the same for all inputs in a given operator
120+
* instance. If state partitioning is applied, the iterator returned depends on the current
121+
* operator input, as the operator maintains an independent state for each partition.
122+
*
123+
* @return The {@link StateFuture} that will return key iterator corresponding to the current
124+
* input.
125+
*/
126+
StateFuture<StateIterator<UK>> asyncKeys();
127+
128+
/**
129+
* Returns the current iterator for all the values of this state asynchronously. When the state
130+
* is not partitioned the returned iterator is the same for all inputs in a given operator
131+
* instance. If state partitioning is applied, the iterator returned depends on the current
132+
* operator input, as the operator maintains an independent state for each partition.
133+
*
134+
* @return The {@link StateFuture} that will return value iterator corresponding to the current
135+
* input.
136+
*/
137+
StateFuture<StateIterator<UV>> asyncValues();
138+
139+
/**
140+
* Returns whether this state contains no key-value mappings asynchronously. When the state is
141+
* not partitioned the returned value is the same for all inputs in a given operator instance.
142+
* If state partitioning is applied, the value returned depends on the current operator input,
143+
* as the operator maintains an independent state for each partition.
144+
*
145+
* @return The {@link StateFuture} that will return true if there is no key-value mapping,
146+
* otherwise false.
147+
*/
148+
StateFuture<Boolean> asyncIsEmpty();
149+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.state.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Extension of {@link AppendingState} that allows merging of state. That is, two instances of
25+
* {@link MergingState} can be combined into a single instance that contains all the information of
26+
* the two merged states.
27+
*
28+
* @param <IN> Type of the value that can be added to the state.
29+
* @param <OUT> Type of the value that can be retrieved from the state.
30+
*/
31+
@Experimental
32+
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.state.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Interface that different types of partitioned state must implement.
25+
*
26+
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
27+
* automatically supplied by the system, so the function always sees the value mapped to the key of
28+
* the current element. That way, the system can handle stream and state partitioning consistently
29+
* together.
30+
*/
31+
@Experimental
32+
public interface State {
33+
34+
/** Removes the value mapped under the current key asynchronously. */
35+
StateFuture<Void> asyncClear();
36+
}

0 commit comments

Comments
 (0)