Skip to content

Commit 105ae30

Browse files
committed
GH-661: [Flight] JDBC: cache failed locations
Instead of repeatedly trying to access locations which have failed before, cache them and move them to the end of the list of locations to attempt on subsequent accesses. Fixes #661.
1 parent 222f30e commit 105ae30

11 files changed

+315
-9
lines changed

flight/flight-sql-jdbc-core/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ under the License.
147147
<groupId>org.checkerframework</groupId>
148148
<artifactId>checker-qual</artifactId>
149149
</dependency>
150+
151+
<dependency>
152+
<groupId>com.github.ben-manes.caffeine</groupId>
153+
<artifactId>caffeine</artifactId>
154+
<version>3.1.8</version>
155+
</dependency>
150156
</dependencies>
151157

152158
<build>

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ExecutorService;
2525
import java.util.concurrent.Executors;
2626
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
27+
import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache;
2728
import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl;
2829
import org.apache.arrow.flight.FlightClient;
2930
import org.apache.arrow.memory.BufferAllocator;
@@ -113,6 +114,7 @@ private static ArrowFlightSqlClientHandler createNewClientHandler(
113114
.withRetainCookies(config.retainCookies())
114115
.withRetainAuth(config.retainAuth())
115116
.withCatalog(config.getCatalog())
117+
.withClientCache(config.useClientCache() ? new FlightClientCache() : null)
116118
.withConnectTimeout(config.getConnectTimeout())
117119
.build();
118120
} catch (final SQLException e) {

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java

+50-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.Optional;
3434
import java.util.Set;
3535
import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
36+
import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache;
37+
import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue;
3638
import org.apache.arrow.flight.CallOption;
3739
import org.apache.arrow.flight.CallStatus;
3840
import org.apache.arrow.flight.CloseSessionRequest;
@@ -75,21 +77,27 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
7577
// JDBC connection string query parameter
7678
private static final String CATALOG = "catalog";
7779

80+
private final String cacheKey;
7881
private final FlightSqlClient sqlClient;
7982
private final Set<CallOption> options = new HashSet<>();
8083
private final Builder builder;
8184
private final Optional<String> catalog;
85+
private final @Nullable FlightClientCache flightClientCache;
8286

8387
ArrowFlightSqlClientHandler(
88+
final String cacheKey,
8489
final FlightSqlClient sqlClient,
8590
final Builder builder,
8691
final Collection<CallOption> credentialOptions,
87-
final Optional<String> catalog) {
92+
final Optional<String> catalog,
93+
final @Nullable FlightClientCache flightClientCache) {
8894
this.options.addAll(builder.options);
8995
this.options.addAll(credentialOptions);
96+
this.cacheKey = Preconditions.checkNotNull(cacheKey);
9097
this.sqlClient = Preconditions.checkNotNull(sqlClient);
9198
this.builder = builder;
9299
this.catalog = catalog;
100+
this.flightClientCache = flightClientCache;
93101
}
94102

95103
/**
@@ -101,12 +109,15 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
101109
* @return a new {@link ArrowFlightSqlClientHandler}.
102110
*/
103111
static ArrowFlightSqlClientHandler createNewHandler(
112+
final String cacheKey,
104113
final FlightClient client,
105114
final Builder builder,
106115
final Collection<CallOption> options,
107-
final Optional<String> catalog) {
116+
final Optional<String> catalog,
117+
final @Nullable FlightClientCache flightClientCache) {
108118
final ArrowFlightSqlClientHandler handler =
109-
new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder, options, catalog);
119+
new ArrowFlightSqlClientHandler(
120+
cacheKey, new FlightSqlClient(client), builder, options, catalog, flightClientCache);
110121
handler.setSetCatalogInSessionIfPresent();
111122
return handler;
112123
}
@@ -148,9 +159,14 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
148159
// location
149160
// is the same as the original connection's Location and skip creating a FlightClient in
150161
// that scenario.
162+
// Also copy the cache to the client so we can share a cache. Cache needs to cache
163+
// negative attempts too.
151164
List<Exception> exceptions = new ArrayList<>();
152165
CloseableEndpointStreamPair stream = null;
153-
for (Location location : endpoint.getLocations()) {
166+
FlightLocationQueue locations =
167+
new FlightLocationQueue(flightClientCache, endpoint.getLocations());
168+
while (locations.hasNext()) {
169+
Location location = locations.next();
154170
final URI endpointUri = location.getUri();
155171
if (endpointUri.getScheme().equals(LocationSchemes.REUSE_CONNECTION)) {
156172
stream =
@@ -163,6 +179,7 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
163179
.withHost(endpointUri.getHost())
164180
.withPort(endpointUri.getPort())
165181
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS))
182+
.withClientCache(flightClientCache)
166183
.withConnectTimeout(builder.connectTimeout);
167184

168185
ArrowFlightSqlClientHandler endpointHandler = null;
@@ -177,12 +194,29 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
177194
stream.getStream().getSchema();
178195
} catch (Exception ex) {
179196
if (endpointHandler != null) {
197+
// If the exception is related to connectivity, mark the client as a dud.
198+
if (flightClientCache != null) {
199+
if (ex instanceof FlightRuntimeException
200+
&& ((FlightRuntimeException) ex).status().code()
201+
== FlightStatusCode.UNAVAILABLE
202+
&&
203+
// IOException covers SocketException and Netty's (private)
204+
// AnnotatedSocketException
205+
// We are looking for things like "Network is unreachable"
206+
ex.getCause() instanceof IOException) {
207+
flightClientCache.markLocationAsDud(location.toString());
208+
}
209+
}
210+
180211
AutoCloseables.close(endpointHandler);
181212
}
182213
exceptions.add(ex);
183214
continue;
184215
}
185216

217+
if (flightClientCache != null) {
218+
flightClientCache.markLocationAsReachable(location.toString());
219+
}
186220
break;
187221
}
188222
if (stream != null) {
@@ -549,6 +583,8 @@ public static final class Builder {
549583

550584
@VisibleForTesting Optional<String> catalog = Optional.empty();
551585

586+
@VisibleForTesting @Nullable FlightClientCache flightClientCache;
587+
552588
@VisibleForTesting @Nullable Duration connectTimeout;
553589

554590
// These two middleware are for internal use within build() and should not be exposed by builder
@@ -833,11 +869,20 @@ public Builder withCatalog(@Nullable final String catalog) {
833869
return this;
834870
}
835871

872+
public Builder withClientCache(FlightClientCache flightClientCache) {
873+
this.flightClientCache = flightClientCache;
874+
return this;
875+
}
876+
836877
public Builder withConnectTimeout(Duration connectTimeout) {
837878
this.connectTimeout = connectTimeout;
838879
return this;
839880
}
840881

882+
public String getCacheKey() {
883+
return getLocation().toString();
884+
}
885+
841886
/** Get the location that this client will connect to. */
842887
public Location getLocation() {
843888
if (useEncryption) {
@@ -931,7 +976,7 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
931976
options.toArray(new CallOption[0])));
932977
}
933978
return ArrowFlightSqlClientHandler.createNewHandler(
934-
client, this, credentialOptions, catalog);
979+
getCacheKey(), client, this, credentialOptions, catalog, flightClientCache);
935980

936981
} catch (final IllegalArgumentException
937982
| GeneralSecurityException
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.driver.jdbc.client.utils;
18+
19+
import com.github.benmanes.caffeine.cache.Cache;
20+
import com.github.benmanes.caffeine.cache.Caffeine;
21+
import java.time.Duration;
22+
import org.apache.arrow.util.VisibleForTesting;
23+
24+
/**
25+
* A cache for Flight clients.
26+
*
27+
* <p>The intent is to avoid constantly recreating clients to the same locations. gRPC can multiplex
28+
* multiple requests over a single TCP connection, and a cache would let us take advantage of that.
29+
*
30+
* <p>At the time being it only tracks whether a location is reachable or not. To actually cache
31+
* clients, we would need a way to incorporate other connection parameters (authentication, etc.)
32+
* into the cache key.
33+
*/
34+
public final class FlightClientCache {
35+
@VisibleForTesting Cache<String, ClientCacheEntry> clientCache;
36+
37+
public FlightClientCache() {
38+
this.clientCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(600)).build();
39+
}
40+
41+
public boolean isDud(String key) {
42+
return clientCache.getIfPresent(key) != null;
43+
}
44+
45+
public void markLocationAsDud(String key) {
46+
clientCache.put(key, new ClientCacheEntry());
47+
}
48+
49+
public void markLocationAsReachable(String key) {
50+
clientCache.invalidate(key);
51+
}
52+
53+
/** A cache entry (empty because we only track reachability, see outer class docstring). */
54+
public static final class ClientCacheEntry {}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.driver.jdbc.client.utils;
18+
19+
import java.util.ArrayDeque;
20+
import java.util.Deque;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.NoSuchElementException;
24+
import org.apache.arrow.flight.Location;
25+
import org.checkerframework.checker.nullness.qual.Nullable;
26+
27+
/**
28+
* A queue of Flight locations to connect to for an endpoint.
29+
*
30+
* <p>This helper class is intended to encapsulate the retry logic in a testable manner.
31+
*/
32+
public final class FlightLocationQueue implements Iterator<Location> {
33+
private final Deque<Location> locations;
34+
private final Deque<Location> badLocations;
35+
36+
/**
37+
* Create a new queue.
38+
*
39+
* @param flightClientCache An optional cache used to sort previously unreachable locations to the
40+
* end.
41+
* @param locations The locations to try.
42+
*/
43+
public FlightLocationQueue(
44+
@Nullable FlightClientCache flightClientCache, List<Location> locations) {
45+
this.locations = new ArrayDeque<>();
46+
this.badLocations = new ArrayDeque<>();
47+
48+
for (Location location : locations) {
49+
if (flightClientCache != null && flightClientCache.isDud(location.toString())) {
50+
this.badLocations.add(location);
51+
} else {
52+
this.locations.add(location);
53+
}
54+
}
55+
}
56+
57+
@Override
58+
public boolean hasNext() {
59+
return !locations.isEmpty() || !badLocations.isEmpty();
60+
}
61+
62+
@Override
63+
public Location next() {
64+
if (!locations.isEmpty()) {
65+
return locations.pop();
66+
} else if (!badLocations.isEmpty()) {
67+
return badLocations.pop();
68+
}
69+
throw new NoSuchElementException();
70+
}
71+
}

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ public Duration getConnectTimeout() {
174174
return Duration.ofMillis(timeout);
175175
}
176176

177+
/** Whether to enable the client cache. */
178+
public boolean useClientCache() {
179+
return ArrowFlightConnectionProperty.USE_CLIENT_CACHE.getBoolean(properties);
180+
}
181+
177182
/**
178183
* Gets the {@link CallOption}s from this {@link ConnectionConfig}.
179184
*
@@ -226,6 +231,7 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty {
226231
RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
227232
CATALOG("catalog", null, Type.STRING, false),
228233
CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false),
234+
USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false),
229235
;
230236

231237
private final String camelName;

flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -699,10 +699,9 @@ public void testFallbackUnresolvableFlightServer() throws Exception {
699699
}
700700
attempt1 = System.nanoTime();
701701
elapsedMs = (attempt1 - start) / 1_000_000.;
702-
// TODO(GH-661): this assertion should be flipped to assertTrue.
703-
assertFalse(
702+
assertTrue(
704703
elapsedMs < 5000.,
705-
String.format("Expected second attempt to be the same, but %f ms elapsed", elapsedMs));
704+
String.format("Expected second attempt to be faster, but %f ms elapsed", elapsedMs));
706705
}
707706
}
708707
}

flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public void testDefaults() {
147147
assertNull(builder.clientCertificatePath);
148148
assertNull(builder.clientKeyPath);
149149
assertEquals(Optional.empty(), builder.catalog);
150+
assertNull(builder.flightClientCache);
150151
assertNull(builder.connectTimeout);
151152
}
152153

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.driver.jdbc.client.utils;
18+
19+
import static org.junit.jupiter.api.Assertions.*;
20+
21+
import org.apache.arrow.flight.Location;
22+
import org.junit.jupiter.api.Test;
23+
24+
class FlightClientCacheTest {
25+
@Test
26+
void basicOperation() {
27+
FlightClientCache cache = new FlightClientCache();
28+
29+
Location location1 = Location.forGrpcInsecure("localhost", 8080);
30+
Location location2 = Location.forGrpcInsecure("localhost", 8081);
31+
32+
assertFalse(cache.isDud(location1.toString()));
33+
assertFalse(cache.isDud(location2.toString()));
34+
35+
cache.markLocationAsReachable(location1.toString());
36+
assertFalse(cache.isDud(location1.toString()));
37+
assertFalse(cache.isDud(location2.toString()));
38+
39+
cache.markLocationAsDud(location1.toString());
40+
assertTrue(cache.isDud(location1.toString()));
41+
assertFalse(cache.isDud(location2.toString()));
42+
43+
cache.markLocationAsDud(location2.toString());
44+
assertTrue(cache.isDud(location1.toString()));
45+
assertTrue(cache.isDud(location2.toString()));
46+
47+
cache.markLocationAsReachable(location1.toString());
48+
assertFalse(cache.isDud(location1.toString()));
49+
assertTrue(cache.isDud(location2.toString()));
50+
}
51+
}

0 commit comments

Comments
 (0)