|
| 1 | +# **RFC-0013 for Java Connector Federation in C++ Workers** |
| 2 | + |
| 3 | +## Java Connector Federation via Arrow Flight |
| 4 | + |
| 5 | +Proposers |
| 6 | + |
| 7 | +* Tim Meehan |
| 8 | +* Bryan Cutler |
| 9 | +* Pratik Dabre |
| 10 | + |
| 11 | +## Related Issues |
| 12 | + |
| 13 | +* [PR #25242](https://github.com/prestodb/presto/pull/25242): Custom Serialization Framework (ConnectorCodecProvider) |
| 14 | +* Arrow Flight Connector in presto-native-execution |
| 15 | + |
| 16 | +## Summary |
| 17 | + |
| 18 | +Enable C++ workers to execute queries using existing single-connection Java connector implementations through Arrow Flight RPC, providing a migration path for connectors not yet ported to C++. |
| 19 | + |
| 20 | +## Background |
| 21 | + |
| 22 | +Presto's C++ workers provide performance improvements but require connectors to be rewritten in C++. Many connectors (JDBC, MongoDB, Elasticsearch) are single-connection systems where the data transfer, not computation, is the bottleneck. Rewriting these connectors in C++ does not improve performance, yet they require a complete reimplementation. |
| 23 | + |
| 24 | +The recent addition of `ConnectorCodecProvider` ([PR #25242](https://github.com/prestodb/presto/pull/25242)) enables custom serialization of connector data structures. Combined with the existing Arrow Flight connector in C++, this provides a path to federation. |
| 25 | + |
| 26 | +### Goals |
| 27 | + |
| 28 | +* Enable single-connection Java connectors to work with C++ workers without code duplication |
| 29 | +* Optimize for connectors where all splits share a single database connection |
| 30 | + |
| 31 | +### Non-goals |
| 32 | + |
| 33 | +* Multi-split connectors (Hive, Iceberg) where parallelism and C++ execution improve performance |
| 34 | +* Modifying the coordinator-worker protocol |
| 35 | + |
| 36 | +## Proposed Implementation |
| 37 | + |
| 38 | +### 1. Architecture Overview |
| 39 | + |
| 40 | +```mermaid |
| 41 | +graph LR |
| 42 | + subgraph "Coordinator (Java)" |
| 43 | + A[JDBC Connector<br/>generates split] |
| 44 | + end |
| 45 | + |
| 46 | + subgraph "C++ Worker" |
| 47 | + B[Arrow Flight<br/>Connector<br/>forwards ticket] |
| 48 | + end |
| 49 | + |
| 50 | + subgraph "Flight Server (Java)" |
| 51 | + C[JDBC Connector<br/>execute split] |
| 52 | + end |
| 53 | + |
| 54 | + A -->|ArrowSplit<br/>with ticket| B |
| 55 | + B -->|gRPC/Flight<br/>protocol| C |
| 56 | + C -->|Arrow<br/>RecordBatches| B |
| 57 | +``` |
| 58 | + |
| 59 | +### 2. Module Changes |
| 60 | + |
| 61 | +#### presto-connector-flight-common (new) |
| 62 | +Common library providing Arrow Flight wrapping (example pseudocode): |
| 63 | + |
| 64 | +```java |
| 65 | +// One-line integration for any connector |
| 66 | +public class ArrowFlightCodecProvider implements ConnectorCodecProvider { |
| 67 | + public static ConnectorCodecProvider wrap( |
| 68 | + ConnectorCodecProvider original, |
| 69 | + ArrowFlightFederationConfig config) { |
| 70 | + if (!config.isEnabled()) { |
| 71 | + return original; |
| 72 | + } |
| 73 | + return new ArrowFlightCodecProvider(original, config); |
| 74 | + } |
| 75 | + |
| 76 | + public Optional<ConnectorCodec<ConnectorSplit>> getConnectorSplitCodec() { |
| 77 | + return Optional.of(new ArrowFlightWrapperCodec( |
| 78 | + originalCodec, |
| 79 | + connectorId)); |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +public class ArrowFlightWrapperCodec implements ConnectorCodec<ConnectorSplit> { |
| 84 | + private final String connectorId; |
| 85 | + |
| 86 | + public byte[] serialize(ConnectorSplit split) { |
| 87 | + byte[] originalBytes = originalCodec.serialize(split); |
| 88 | + |
| 89 | + // Create ticket with connector ID prefix to identify the connector |
| 90 | + ByteBuffer ticketBuffer = ByteBuffer.allocate( |
| 91 | + 4 + connectorId.length() + originalBytes.length); |
| 92 | + ticketBuffer.putInt(connectorId.length()); |
| 93 | + ticketBuffer.put(connectorId.getBytes(UTF_8)); |
| 94 | + ticketBuffer.put(originalBytes); |
| 95 | + |
| 96 | + // No locations added - C++ worker will use locally configured Flight server |
| 97 | + FlightEndpoint.Builder endpoint = FlightEndpoint.newBuilder() |
| 98 | + .setTicket(FlightTicket.newBuilder() |
| 99 | + .setTicket(ByteString.copyFrom(ticketBuffer.array()))); |
| 100 | + |
| 101 | + ArrowSplit arrowSplit = new ArrowSplit(); |
| 102 | + arrowSplit.setFlightEndpointBytes(Base64.encode(endpoint.build().toByteArray())); |
| 103 | + return arrowFlightCodec.serialize(arrowSplit); |
| 104 | + } |
| 105 | +} |
| 106 | + |
| 107 | +``` |
| 108 | + |
| 109 | +#### Integration for any single-connection Connector |
| 110 | +```java |
| 111 | +// JDBC Connector |
| 112 | +public class JdbcConnector implements Connector { |
| 113 | + @Inject ArrowFlightFederationConfig flightConfig; |
| 114 | + |
| 115 | + @Override |
| 116 | + public ConnectorCodecProvider getConnectorCodecProvider() { |
| 117 | + // Enable federation |
| 118 | + return ArrowFlightCodecProvider.wrap( |
| 119 | + getDefaultJdbcCodecProvider(), |
| 120 | + flightConfig); |
| 121 | + } |
| 122 | +} |
| 123 | + |
| 124 | +// MongoDB Connector - identical pattern |
| 125 | +public class MongoConnector implements Connector { |
| 126 | + @Inject ArrowFlightFederationConfig flightConfig; |
| 127 | + |
| 128 | + @Override |
| 129 | + public ConnectorCodecProvider getConnectorCodecProvider() { |
| 130 | + return ArrowFlightCodecProvider.wrap( |
| 131 | + getDefaultMongoCodecProvider(), |
| 132 | + flightConfig); |
| 133 | + } |
| 134 | +} |
| 135 | +``` |
| 136 | + |
| 137 | +#### presto-native-execution changes |
| 138 | + |
| 139 | +##### Custom Serialization Support |
| 140 | + |
| 141 | +Override the `deserialize` method to parse the binary format that matches the Java ArrowSplit codec's `serialize` method: |
| 142 | + |
| 143 | +```cpp |
| 144 | +class ArrowConnectorProtocol : public ConnectorProtocolTemplate< |
| 145 | + ArrowTableHandle, |
| 146 | + ArrowTableLayoutHandle, |
| 147 | + ArrowColumnHandle, |
| 148 | + NotImplemented, |
| 149 | + NotImplemented, |
| 150 | + ArrowSplit, |
| 151 | + NotImplemented, |
| 152 | + ArrowTransactionHandle, |
| 153 | + NotImplemented, |
| 154 | + NotImplemented> { |
| 155 | +public: |
| 156 | + void deserialize( |
| 157 | + const std::string& thrift, |
| 158 | + std::shared_ptr<ConnectorSplit>& proto) const override { |
| 159 | + // Parse binary format matching Java's ArrowSplit codec serialize method |
| 160 | + auto arrowSplit = std::make_shared<ArrowSplit>(); |
| 161 | + // Binary format: [schemaName_len][schemaName][tableName_len][tableName][flightEndpointBytes] |
| 162 | + // Implementation details... |
| 163 | + proto = arrowSplit; |
| 164 | + } |
| 165 | +}; |
| 166 | +``` |
| 167 | +
|
| 168 | +The deserialization path in `ProtocolToThrift.cpp` already calls `getConnectorProtocol(connectorId).deserialize()`. |
| 169 | +
|
| 170 | +The existing Arrow Flight connector already uses the locally configured `arrow-flight.server` and `arrow-flight.server.port` properties. When no locations are provided in the FlightEndpoint, it automatically falls back to these configured values. |
| 171 | +
|
| 172 | +##### Colocated Flight Server Process Management |
| 173 | +
|
| 174 | +When `arrow-flight.colocated=true`, the C++ worker manages a Flight server subprocess: |
| 175 | +
|
| 176 | +- Automatically configures `arrow-flight.server=localhost` and uses `arrow-flight.colocated.port` |
| 177 | +- Spawns a Java subprocess running `presto-flight-server` with JVM options from `${etc.dir}/jvm.config` |
| 178 | +- Restarts the Flight server if it crashes |
| 179 | +- Terminates the subprocess on worker shutdown |
| 180 | +
|
| 181 | +#### presto-flight-server (new) |
| 182 | +Flight server that loads connectors using Presto's plugin loading approach, adapted from the main Presto server implementation. |
| 183 | +
|
| 184 | +The Flight server uses standard Presto configuration structure: |
| 185 | +- `jvm.config`: JVM heap size and flags (e.g., `-Xmx4G -XX:+UseG1GC`) |
| 186 | +- `config.properties`: Flight server configuration including: |
| 187 | + - `catalog.config-dir`: Path to catalog configurations (default: `etc/catalog`) |
| 188 | + - `plugin.dir`: Path to plugin directory |
| 189 | + - Other Flight server-specific properties |
| 190 | +- Catalog directory (as specified by `catalog.config-dir`): Connector configurations (jdbc.properties, mongodb.properties, etc.) |
| 191 | +
|
| 192 | +```java |
| 193 | +public class FlightServerPluginManager { |
| 194 | + private final Map<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>(); |
| 195 | + private final Map<String, Connector> connectors = new ConcurrentHashMap<>(); |
| 196 | +
|
| 197 | + public void loadPlugins() throws Exception { |
| 198 | + // Use same approach as PluginManager to load from installed plugins dir |
| 199 | + for (File pluginDir : listFiles(installedPluginsDir)) { |
| 200 | + loadPlugin(pluginDir); |
| 201 | + } |
| 202 | + } |
| 203 | +
|
| 204 | + private void loadPlugin(File pluginDir) { |
| 205 | + URLClassLoader pluginClassLoader = buildClassLoader(pluginDir); |
| 206 | + ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader); |
| 207 | +
|
| 208 | + for (Plugin plugin : serviceLoader) { |
| 209 | + for (ConnectorFactory factory : plugin.getConnectorFactories()) { |
| 210 | + log.info("Registering connector %s", factory.getName()); |
| 211 | + connectorFactories.put(factory.getName(), factory); |
| 212 | + } |
| 213 | + } |
| 214 | + } |
| 215 | +
|
| 216 | + public Connector getConnector(String connectorName) { |
| 217 | + // Create connector instances from factories as needed |
| 218 | + return connectors.computeIfAbsent(connectorName, name -> { |
| 219 | + ConnectorFactory factory = connectorFactories.get(name); |
| 220 | + return factory.create(name, config, connectorContext); |
| 221 | + }); |
| 222 | + } |
| 223 | +} |
| 224 | +
|
| 225 | +public class ConnectorFlightProducer extends FlightProducer { |
| 226 | + private final FlightServerPluginManager pluginManager; |
| 227 | +
|
| 228 | + @Override |
| 229 | + public void getStream(CallContext context, Ticket ticket, |
| 230 | + ServerStreamListener listener) { |
| 231 | + // Ticket format: [connectorId_length][connectorId][split_bytes] |
| 232 | + ByteBuffer buffer = ByteBuffer.wrap(ticket.getBytes()); |
| 233 | + int connectorIdLength = buffer.getInt(); |
| 234 | + byte[] connectorIdBytes = new byte[connectorIdLength]; |
| 235 | + buffer.get(connectorIdBytes); |
| 236 | + String connectorId = new String(connectorIdBytes, UTF_8); |
| 237 | +
|
| 238 | + // Get connector from plugin manager |
| 239 | + Connector connector = pluginManager.getConnector(connectorId); |
| 240 | + ConnectorCodec<ConnectorSplit> codec = connector.getConnectorCodecProvider() |
| 241 | + .getConnectorSplitCodec() |
| 242 | + .orElse(defaultJsonCodec); |
| 243 | +
|
| 244 | + // Deserialize and execute |
| 245 | + byte[] splitBytes = new byte[buffer.remaining()]; |
| 246 | + buffer.get(splitBytes); |
| 247 | + ConnectorSplit split = codec.deserialize(splitBytes); |
| 248 | +
|
| 249 | + ConnectorPageSource pageSource = connector.getPageSource(split); |
| 250 | + streamPagesAsArrow(pageSource, listener); |
| 251 | + } |
| 252 | +} |
| 253 | +``` |
| 254 | + |
| 255 | +### 3. Configuration |
| 256 | + |
| 257 | +#### Authentication |
| 258 | + |
| 259 | +Arrow Flight supports pluggable authentication. For the initial implementation, authentication is not required for colocated deployments since the Flight server only accepts localhost connections. Remote deployments requiring authentication would need to implement custom authenticators based on their security requirements. |
| 260 | + |
| 261 | +#### Colocated Deployment Mode |
| 262 | + |
| 263 | +```properties |
| 264 | +# C++ worker configuration |
| 265 | +arrow-flight.colocated=true |
| 266 | +arrow-flight.colocated.port=8815 # Port for the colocated Flight server |
| 267 | +# arrow-flight.server is automatically set to localhost |
| 268 | +# Setting arrow-flight.server is an error when colocated=true |
| 269 | + |
| 270 | +# Flight server configuration directory |
| 271 | +arrow-flight.federation.etc.dir=/path/to/flight-server/etc |
| 272 | +``` |
| 273 | + |
| 274 | +When `arrow-flight.federation.etc.dir` is configured, it points to a directory containing the configuration files for the presto-flight-server process (see presto-flight-server section above for details). |
| 275 | + |
| 276 | +#### Remote Deployment Mode |
| 277 | + |
| 278 | +```properties |
| 279 | +# C++ worker configuration |
| 280 | +arrow-flight.colocated=false # Default |
| 281 | +arrow-flight.server=flight-server.internal # Should be set for federation |
| 282 | +arrow-flight.server.port=8815 # Should be set for federation |
| 283 | + |
| 284 | +# Custom authenticator configuration would be added here |
| 285 | +# based on the authenticator implementation |
| 286 | +``` |
| 287 | + |
| 288 | +When `arrow-flight.colocated=true`, setting `arrow-flight.server` throws a configuration error since the server is automatically set to localhost and the port is configured via `arrow-flight.colocated.port`. For this federation architecture to work when `colocated=false`, both `arrow-flight.server` and `arrow-flight.server.port` should be configured since the FlightEndpoints sent by the coordinator do not include locations. |
| 289 | + |
| 290 | +### 4. Protocol Flow |
| 291 | + |
| 292 | +1. Coordinator's JDBC connector creates `JdbcSplit` |
| 293 | +2. `ArrowFlightWrapperCodec` serializes as `ArrowSplit` with embedded ticket (no locations) |
| 294 | +3. C++ worker receives `ArrowSplit` via custom serialization protocol |
| 295 | +4. Arrow Flight connector extracts ticket, sends to locally configured Flight server |
| 296 | +5. Flight server deserializes ticket as `JdbcSplit`, executes query |
| 297 | +6. Results stream back as Arrow RecordBatches |
| 298 | + |
| 299 | +## Metrics |
| 300 | + |
| 301 | +* Flight protocol overhead (latency comparison between C++ worker and Flight server metrics) |
| 302 | +* Data transfer throughput between C++ worker and Flight server |
| 303 | + |
| 304 | +## Other Approaches Considered |
| 305 | + |
| 306 | +**JNI Integration**: Direct JVM embedding in C++ workers. Rejected due to memory management and debugging requirements. |
| 307 | + |
| 308 | +**Custom Protocol**: Thrift-based connector protocol. Arrow Flight chosen instead for columnar-native transport, existing ecosystem support, and potential for future cross-language connector implementations. |
| 309 | + |
| 310 | +## Adoption Plan |
| 311 | + |
| 312 | +* New configuration properties: |
| 313 | + - Per-connector: `arrow-flight-federation.enabled` |
| 314 | + - Per C++ worker: |
| 315 | + - `arrow-flight.colocated`: Whether to spawn local Flight server subprocess |
| 316 | + - `arrow-flight.colocated.port`: Port for colocated Flight server (used when colocated=true) |
| 317 | + - `arrow-flight.server` and `arrow-flight.server.port`: Should be set when colocated=false for federation to work |
| 318 | + - `arrow-flight.federation.etc.dir`: Configuration directory for colocated Flight server |
| 319 | + |
| 320 | +* Single-connection connectors can be migrated individually |
| 321 | +* Documentation: Update each connector's documentation as it gains Flight support |
| 322 | + |
| 323 | +## Test Plan |
| 324 | + |
| 325 | +* Unit tests for split serialization |
| 326 | +* Integration tests for end-to-end query execution through Flight server |
| 327 | +* Tests for both colocated and remote Flight server configurations |
0 commit comments