Skip to content

Commit 1e06d88

Browse files
authored
[RRIO] [Throttle] [Cache] Implement Throttle and Cache using an external resource. (#29401)
* WIP: Implement CacheSerializer and providers * wip * Condense Throttle into one class * wip * Implement Throttle and Cache * Update javadoc * Edit per PR comments * Refacter per PR comments
1 parent 00d5863 commit 1e06d88

24 files changed

+1559
-500
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Configures patch for ../base/configmap.yaml
17+
# See https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/
18+
19+
- op: replace
20+
path: /metadata/labels/quota-id
21+
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
22+
- op: replace
23+
path: /data/QUOTA_ID
24+
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
25+
- op: replace
26+
path: /data/QUOTA_SIZE
27+
value: "10"
28+
- op: replace
29+
path: /data/QUOTA_REFRESH_INTERVAL
30+
value: 1s
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Configures patch for ../base/deployment.yaml
17+
# See https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/
18+
19+
- op: replace
20+
path: /metadata/labels/quota-id
21+
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
22+
- op: replace
23+
path: /spec/selector/matchLabels/quota-id
24+
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
25+
- op: replace
26+
path: /spec/template/metadata/labels/quota-id
27+
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Configures the overlay for .test-infra/mock-apis/infrastructure/kubernetes/refresher/base
17+
# Using the Quota Id:
18+
# echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
19+
20+
resources:
21+
- ../../base
22+
23+
nameSuffix: -throttle-with-external-resource-test-10-per-1s
24+
25+
patches:
26+
- path: configmap.yaml
27+
target:
28+
kind: ConfigMap
29+
name: refresher
30+
31+
- path: deployment.yaml
32+
target:
33+
kind: Deployment
34+
name: refresher

sdks/java/io/rrio/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ dependencies {
5050
testImplementation platform(library.java.google_cloud_platform_libraries_bom)
5151
testImplementation library.java.google_http_client
5252
testImplementation library.java.junit
53+
testImplementation library.java.hamcrest
5354
testImplementation library.java.testcontainers_base
5455

5556
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.io.requestresponse;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.IOException;
24+
import org.apache.beam.sdk.coders.Coder;
25+
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
26+
import org.apache.beam.sdk.coders.KvCoder;
27+
import org.apache.beam.sdk.transforms.PTransform;
28+
import org.apache.beam.sdk.values.KV;
29+
import org.apache.beam.sdk.values.PCollection;
30+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
32+
import org.joda.time.Duration;
33+
34+
/** Transforms for reading and writing request/response associations to a cache. */
35+
final class Cache {
36+
37+
/**
38+
* Instantiates a {@link Call} {@link PTransform} that reads {@link RequestT} {@link ResponseT}
39+
* associations from a cache. The {@link KV} value is null when no association exists. This method
40+
* does not enforce {@link Coder#verifyDeterministic} and defers to the user to determine whether
41+
* to enforce this given the cache implementation.
42+
*/
43+
static <
44+
RequestT,
45+
@Nullable ResponseT,
46+
CallerSetupTeardownT extends
47+
Caller<RequestT, KV<RequestT, @Nullable ResponseT>> & SetupTeardown>
48+
PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable ResponseT>>> read(
49+
CallerSetupTeardownT implementsCallerSetupTeardown,
50+
Coder<RequestT> requestTCoder,
51+
Coder<@Nullable ResponseT> responseTCoder) {
52+
return Call.ofCallerAndSetupTeardown(
53+
implementsCallerSetupTeardown, KvCoder.of(requestTCoder, responseTCoder));
54+
}
55+
56+
/**
57+
* Instantiates a {@link Call} {@link PTransform}, calling {@link #read} with a {@link Caller}
58+
* that employs a redis client.
59+
*
60+
* <p>This method requires both the {@link RequestT} and {@link ResponseT}s' {@link
61+
* Coder#verifyDeterministic}. Otherwise, it throws a {@link NonDeterministicException}.
62+
*
63+
* <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously
64+
* reading and writing to a shared instance. See <a
65+
* href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important
66+
* considerations when using this method to achieve cache reads.
67+
*/
68+
static <RequestT, @Nullable ResponseT>
69+
PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable ResponseT>>>
70+
readUsingRedis(
71+
RedisClient client,
72+
Coder<RequestT> requestTCoder,
73+
Coder<@Nullable ResponseT> responseTCoder)
74+
throws NonDeterministicException {
75+
return read(
76+
new UsingRedis<>(requestTCoder, responseTCoder, client).read(),
77+
requestTCoder,
78+
responseTCoder);
79+
}
80+
81+
/**
82+
* Write a {@link RequestT} {@link ResponseT} association to a cache. This method does not enforce
83+
* {@link Coder#verifyDeterministic} and defers to the user to determine whether to enforce this
84+
* given the cache implementation.
85+
*/
86+
static <
87+
RequestT,
88+
ResponseT,
89+
CallerSetupTeardownT extends
90+
Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>> & SetupTeardown>
91+
PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, ResponseT>>> write(
92+
CallerSetupTeardownT implementsCallerSetupTeardown,
93+
KvCoder<RequestT, ResponseT> kvCoder) {
94+
return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown, kvCoder);
95+
}
96+
97+
/**
98+
* Instantiates a {@link Call} {@link PTransform}, calling {@link #write} with a {@link Caller}
99+
* that employs a redis client.
100+
*
101+
* <p>This method requires both the {@link RequestT} and {@link ResponseT}s' {@link
102+
* Coder#verifyDeterministic}. Otherwise, it throws a {@link NonDeterministicException}.
103+
*
104+
* <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously
105+
* reading and writing to a shared instance. See <a
106+
* href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important
107+
* considerations when using this method to achieve cache writes.
108+
*/
109+
static <RequestT, ResponseT>
110+
PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, ResponseT>>>
111+
writeUsingRedis(
112+
Duration expiry,
113+
RedisClient client,
114+
Coder<RequestT> requestTCoder,
115+
Coder<@Nullable ResponseT> responseTCoder)
116+
throws NonDeterministicException {
117+
return write(
118+
new UsingRedis<>(requestTCoder, responseTCoder, client).write(expiry),
119+
KvCoder.of(requestTCoder, responseTCoder));
120+
}
121+
122+
private static class UsingRedis<RequestT, ResponseT> {
123+
private final Coder<RequestT> requestTCoder;
124+
private final Coder<@Nullable ResponseT> responseTCoder;
125+
private final RedisClient client;
126+
127+
private UsingRedis(
128+
Coder<RequestT> requestTCoder,
129+
Coder<@Nullable ResponseT> responseTCoder,
130+
RedisClient client)
131+
throws Coder.NonDeterministicException {
132+
this.client = client;
133+
requestTCoder.verifyDeterministic();
134+
responseTCoder.verifyDeterministic();
135+
this.requestTCoder = requestTCoder;
136+
this.responseTCoder = responseTCoder;
137+
}
138+
139+
private Read<RequestT, @Nullable ResponseT> read() {
140+
return new Read<>(requestTCoder, responseTCoder, client);
141+
}
142+
143+
private Write<RequestT, ResponseT> write(Duration expiry) {
144+
return new Write<>(expiry, requestTCoder, responseTCoder, client);
145+
}
146+
147+
/** Reads associated {@link RequestT} {@link ResponseT} using a {@link RedisClient}. */
148+
private static class Read<RequestT, @Nullable ResponseT>
149+
implements Caller<RequestT, KV<RequestT, @Nullable ResponseT>>, SetupTeardown {
150+
151+
private final Coder<RequestT> requestTCoder;
152+
private final Coder<@Nullable ResponseT> responseTCoder;
153+
private final RedisClient client;
154+
155+
private Read(
156+
Coder<RequestT> requestTCoder,
157+
Coder<@Nullable ResponseT> responseTCoder,
158+
RedisClient client) {
159+
this.requestTCoder = requestTCoder;
160+
this.responseTCoder = responseTCoder;
161+
this.client = client;
162+
}
163+
164+
@Override
165+
public KV<RequestT, @Nullable ResponseT> call(RequestT request)
166+
throws UserCodeExecutionException {
167+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
168+
try {
169+
requestTCoder.encode(request, baos);
170+
byte[] encodedRequest = baos.toByteArray();
171+
byte[] encodedResponse = client.getBytes(encodedRequest);
172+
if (encodedResponse == null) {
173+
return KV.of(request, null);
174+
}
175+
ResponseT response =
176+
checkStateNotNull(
177+
responseTCoder.decode(ByteSource.wrap(encodedResponse).openStream()));
178+
return KV.of(request, response);
179+
} catch (IllegalStateException | IOException e) {
180+
throw new UserCodeExecutionException(e);
181+
}
182+
}
183+
184+
@Override
185+
public void setup() throws UserCodeExecutionException {
186+
client.setup();
187+
}
188+
189+
@Override
190+
public void teardown() throws UserCodeExecutionException {
191+
client.teardown();
192+
}
193+
}
194+
}
195+
196+
private static class Write<RequestT, ResponseT>
197+
implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>, SetupTeardown {
198+
private final Duration expiry;
199+
private final Coder<RequestT> requestTCoder;
200+
private final Coder<@Nullable ResponseT> responseTCoder;
201+
private final RedisClient client;
202+
203+
private Write(
204+
Duration expiry,
205+
Coder<RequestT> requestTCoder,
206+
Coder<@Nullable ResponseT> responseTCoder,
207+
RedisClient client) {
208+
this.expiry = expiry;
209+
this.requestTCoder = requestTCoder;
210+
this.responseTCoder = responseTCoder;
211+
this.client = client;
212+
}
213+
214+
@Override
215+
public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> request)
216+
throws UserCodeExecutionException {
217+
ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
218+
ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
219+
try {
220+
requestTCoder.encode(request.getKey(), keyStream);
221+
responseTCoder.encode(request.getValue(), valueStream);
222+
} catch (IOException e) {
223+
throw new UserCodeExecutionException(e);
224+
}
225+
client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry);
226+
return request;
227+
}
228+
229+
@Override
230+
public void setup() throws UserCodeExecutionException {
231+
client.setup();
232+
}
233+
234+
@Override
235+
public void teardown() throws UserCodeExecutionException {
236+
client.teardown();
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)