33
33
import java .util .Optional ;
34
34
import java .util .Set ;
35
35
import org .apache .arrow .driver .jdbc .client .utils .ClientAuthenticationUtils ;
36
+ import org .apache .arrow .driver .jdbc .client .utils .FlightClientCache ;
37
+ import org .apache .arrow .driver .jdbc .client .utils .FlightLocationQueue ;
36
38
import org .apache .arrow .flight .CallOption ;
37
39
import org .apache .arrow .flight .CallStatus ;
38
40
import org .apache .arrow .flight .CloseSessionRequest ;
@@ -75,21 +77,27 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
75
77
// JDBC connection string query parameter
76
78
private static final String CATALOG = "catalog" ;
77
79
80
+ private final String cacheKey ;
78
81
private final FlightSqlClient sqlClient ;
79
82
private final Set <CallOption > options = new HashSet <>();
80
83
private final Builder builder ;
81
84
private final Optional <String > catalog ;
85
+ private final @ Nullable FlightClientCache flightClientCache ;
82
86
83
87
ArrowFlightSqlClientHandler (
88
+ final String cacheKey ,
84
89
final FlightSqlClient sqlClient ,
85
90
final Builder builder ,
86
91
final Collection <CallOption > credentialOptions ,
87
- final Optional <String > catalog ) {
92
+ final Optional <String > catalog ,
93
+ final @ Nullable FlightClientCache flightClientCache ) {
88
94
this .options .addAll (builder .options );
89
95
this .options .addAll (credentialOptions );
96
+ this .cacheKey = Preconditions .checkNotNull (cacheKey );
90
97
this .sqlClient = Preconditions .checkNotNull (sqlClient );
91
98
this .builder = builder ;
92
99
this .catalog = catalog ;
100
+ this .flightClientCache = flightClientCache ;
93
101
}
94
102
95
103
/**
@@ -101,12 +109,15 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
101
109
* @return a new {@link ArrowFlightSqlClientHandler}.
102
110
*/
103
111
static ArrowFlightSqlClientHandler createNewHandler (
112
+ final String cacheKey ,
104
113
final FlightClient client ,
105
114
final Builder builder ,
106
115
final Collection <CallOption > options ,
107
- final Optional <String > catalog ) {
116
+ final Optional <String > catalog ,
117
+ final @ Nullable FlightClientCache flightClientCache ) {
108
118
final ArrowFlightSqlClientHandler handler =
109
- new ArrowFlightSqlClientHandler (new FlightSqlClient (client ), builder , options , catalog );
119
+ new ArrowFlightSqlClientHandler (
120
+ cacheKey , new FlightSqlClient (client ), builder , options , catalog , flightClientCache );
110
121
handler .setSetCatalogInSessionIfPresent ();
111
122
return handler ;
112
123
}
@@ -148,9 +159,14 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
148
159
// location
149
160
// is the same as the original connection's Location and skip creating a FlightClient in
150
161
// that scenario.
162
+ // Also copy the cache to the client so we can share a cache. Cache needs to cache
163
+ // negative attempts too.
151
164
List <Exception > exceptions = new ArrayList <>();
152
165
CloseableEndpointStreamPair stream = null ;
153
- for (Location location : endpoint .getLocations ()) {
166
+ FlightLocationQueue locations =
167
+ new FlightLocationQueue (flightClientCache , endpoint .getLocations ());
168
+ while (locations .hasNext ()) {
169
+ Location location = locations .next ();
154
170
final URI endpointUri = location .getUri ();
155
171
if (endpointUri .getScheme ().equals (LocationSchemes .REUSE_CONNECTION )) {
156
172
stream =
@@ -163,6 +179,7 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
163
179
.withHost (endpointUri .getHost ())
164
180
.withPort (endpointUri .getPort ())
165
181
.withEncryption (endpointUri .getScheme ().equals (LocationSchemes .GRPC_TLS ))
182
+ .withClientCache (flightClientCache )
166
183
.withConnectTimeout (builder .connectTimeout );
167
184
168
185
ArrowFlightSqlClientHandler endpointHandler = null ;
@@ -177,12 +194,29 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
177
194
stream .getStream ().getSchema ();
178
195
} catch (Exception ex ) {
179
196
if (endpointHandler != null ) {
197
+ // If the exception is related to connectivity, mark the client as a dud.
198
+ if (flightClientCache != null ) {
199
+ if (ex instanceof FlightRuntimeException
200
+ && ((FlightRuntimeException ) ex ).status ().code ()
201
+ == FlightStatusCode .UNAVAILABLE
202
+ &&
203
+ // IOException covers SocketException and Netty's (private)
204
+ // AnnotatedSocketException
205
+ // We are looking for things like "Network is unreachable"
206
+ ex .getCause () instanceof IOException ) {
207
+ flightClientCache .markLocationAsDud (location .toString ());
208
+ }
209
+ }
210
+
180
211
AutoCloseables .close (endpointHandler );
181
212
}
182
213
exceptions .add (ex );
183
214
continue ;
184
215
}
185
216
217
+ if (flightClientCache != null ) {
218
+ flightClientCache .markLocationAsReachable (location .toString ());
219
+ }
186
220
break ;
187
221
}
188
222
if (stream != null ) {
@@ -549,6 +583,8 @@ public static final class Builder {
549
583
550
584
@ VisibleForTesting Optional <String > catalog = Optional .empty ();
551
585
586
+ @ VisibleForTesting @ Nullable FlightClientCache flightClientCache ;
587
+
552
588
@ VisibleForTesting @ Nullable Duration connectTimeout ;
553
589
554
590
// These two middleware are for internal use within build() and should not be exposed by builder
@@ -833,11 +869,20 @@ public Builder withCatalog(@Nullable final String catalog) {
833
869
return this ;
834
870
}
835
871
872
+ public Builder withClientCache (FlightClientCache flightClientCache ) {
873
+ this .flightClientCache = flightClientCache ;
874
+ return this ;
875
+ }
876
+
836
877
public Builder withConnectTimeout (Duration connectTimeout ) {
837
878
this .connectTimeout = connectTimeout ;
838
879
return this ;
839
880
}
840
881
882
+ public String getCacheKey () {
883
+ return getLocation ().toString ();
884
+ }
885
+
841
886
/** Get the location that this client will connect to. */
842
887
public Location getLocation () {
843
888
if (useEncryption ) {
@@ -931,7 +976,7 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
931
976
options .toArray (new CallOption [0 ])));
932
977
}
933
978
return ArrowFlightSqlClientHandler .createNewHandler (
934
- client , this , credentialOptions , catalog );
979
+ getCacheKey (), client , this , credentialOptions , catalog , flightClientCache );
935
980
936
981
} catch (final IllegalArgumentException
937
982
| GeneralSecurityException
0 commit comments