|
| 1 | +# **RFC-0013 for Java Connector Federation in C++ Workers** |
| 2 | + |
| 3 | +## Java Connector Federation via Arrow Flight |
| 4 | + |
| 5 | +Proposers |
| 6 | + |
| 7 | +* Tim Meehan |
| 8 | +* Bryan Cutler |
| 9 | +* Pratik Dabre |
| 10 | + |
| 11 | +## Related Issues |
| 12 | + |
| 13 | +* [PR #25242](https://github.com/prestodb/presto/pull/25242): Custom Serialization Framework (ConnectorCodecProvider) |
| 14 | +* Arrow Flight Connector in presto-native-execution |
| 15 | + |
| 16 | +## Summary |
| 17 | + |
| 18 | +Enable C++ workers to execute queries using existing single-node Java connector implementations through a dedicated Arrow Federation connector that forwards requests to an Arrow Flight server, providing a migration path for single-node connectors not yet ported to C++. |
| 19 | + |
| 20 | +## Background |
| 21 | + |
| 22 | +Presto's C++ workers provide performance improvements but require connectors to be rewritten in C++. Many single-node connectors (JDBC, MongoDB, Elasticsearch, Cassandra, Redis) are systems where the data transfer, not computation, is the bottleneck. Rewriting these connectors in C++ does not improve performance, yet they require a complete reimplementation. |
| 23 | + |
| 24 | +By creating a dedicated Arrow Federation connector in C++ that derives from the existing Arrow Flight connector, we can forward splits from single-node Java connectors to a Flight server that executes them using the original Java connector implementations. |
| 25 | + |
| 26 | +### Goals |
| 27 | + |
| 28 | +* Enable single-node Java connectors to work with C++ workers without code duplication |
| 29 | +* Provide a simple federation mechanism through a dedicated Arrow Federation connector |
| 30 | +* Support connectors like JDBC, MongoDB, Elasticsearch, and other single-node systems |
| 31 | + |
| 32 | +### Non-goals |
| 33 | + |
| 34 | +* Distributed connectors (Hive, Iceberg, Delta Lake) where parallelism and C++ execution improve performance |
| 35 | +* Modifying the coordinator-worker protocol |
| 36 | + |
| 37 | +## Proposed Implementation |
| 38 | + |
| 39 | +### 1. Architecture Overview |
| 40 | + |
| 41 | +```mermaid |
| 42 | +graph LR |
| 43 | + subgraph "Coordinator (Java)" |
| 44 | + A[Single-node Connector<br/>(e.g., JDBC, MongoDB)<br/>generates split] |
| 45 | + end |
| 46 | +
|
| 47 | + subgraph "C++ Worker" |
| 48 | + B[Arrow Federation<br/>Connector<br/>wraps & forwards] |
| 49 | + end |
| 50 | +
|
| 51 | + subgraph "Flight Server (Java)" |
| 52 | + C[Same Connector<br/>executes split] |
| 53 | + end |
| 54 | +
|
| 55 | + A -->|Connector Split| B |
| 56 | + B -->|Flight Request<br/>with split + columns| C |
| 57 | + C -->|Arrow<br/>RecordBatches| B |
| 58 | +``` |
| 59 | + |
| 60 | +### 2. Module Changes |
| 61 | + |
| 62 | +#### presto-native-execution: Arrow Federation Connector |
| 63 | + |
| 64 | +##### Protocol Definition |
| 65 | + |
| 66 | +The Arrow Federation connector uses a dedicated protocol to handle splits from single-node connectors: |
| 67 | + |
| 68 | +```cpp |
| 69 | +// ArrowFederationProtocol.h |
| 70 | +namespace facebook::presto::protocol::arrow_federation { |
| 71 | +using ArrowFederationProtocol = ConnectorProtocolTemplate< |
| 72 | + ArrowFederationTableHandle, |
| 73 | + ArrowFederationTableLayoutHandle, |
| 74 | + ArrowFederationColumnHandle, |
| 75 | + NotImplemented, |
| 76 | + NotImplemented, |
| 77 | + ArrowFederationSplit, |
| 78 | + NotImplemented, |
| 79 | + ArrowFederationTransactionHandle, |
| 80 | + NotImplemented, |
| 81 | + NotImplemented>; |
| 82 | +} // namespace facebook::presto::protocol::arrow_federation |
| 83 | +``` |
| 84 | +
|
| 85 | +```cpp |
| 86 | +// presto_protocol_arrow_federation.h |
| 87 | +namespace facebook::presto::protocol::arrow_federation { |
| 88 | +struct ArrowFederationSplit : public ConnectorSplit { |
| 89 | + String splitBytes = {}; |
| 90 | +
|
| 91 | + ArrowFederationSplit() noexcept; |
| 92 | +}; |
| 93 | +
|
| 94 | +void to_json(json& j, const ArrowFederationSplit& p) { |
| 95 | + auto split = folly::base64Decode(p.splitBytes); |
| 96 | + j = json::parse(split); |
| 97 | + j["@type"] = "arrow-flight"; |
| 98 | +} |
| 99 | +
|
| 100 | +void from_json(const json& j, ArrowFederationSplit& p) { |
| 101 | + p._type = j["@type"]; |
| 102 | + p.splitBytes = folly::base64Encode(j.dump()); |
| 103 | +} |
| 104 | +} // namespace facebook::presto::protocol::arrow_federation |
| 105 | +``` |
| 106 | + |
| 107 | +##### Federation Connector Implementation |
| 108 | + |
| 109 | +The Arrow Federation connector extends the Arrow Flight connector to wrap single-node connector splits and forward them: |
| 110 | + |
| 111 | +```cpp |
| 112 | +class ArrowFederationDataSource : public ArrowFlightDataSource { |
| 113 | + public: |
| 114 | + ArrowFederationDataSource( |
| 115 | + const velox::RowTypePtr& outputType, |
| 116 | + const velox::connector::ColumnHandleMap& columnHandles, |
| 117 | + std::shared_ptr<Authenticator> authenticator, |
| 118 | + const velox::connector::ConnectorQueryCtx* connectorQueryCtx, |
| 119 | + const std::shared_ptr<ArrowFlightConfig>& flightConfig, |
| 120 | + const std::shared_ptr<arrow::flight::FlightClientOptions>& clientOpts); |
| 121 | + |
| 122 | + void addSplit( |
| 123 | + std::shared_ptr<velox::connector::ConnectorSplit> split) override; |
| 124 | +}; |
| 125 | + |
| 126 | +struct ArrowFederationSplit : public velox::connector::ConnectorSplit { |
| 127 | + /// @param splitBytes Base64 Serialized Split for Arrow Federation Flight Server |
| 128 | + /// Can contain any single-node connector split (JDBC, MongoDB, Elasticsearch, etc.) |
| 129 | + ArrowFederationSplit(const std::string& splitBytes) |
| 130 | + : ConnectorSplit(connectorId), |
| 131 | + splitBytes_(splitBytes) {} |
| 132 | + |
| 133 | + const std::string splitBytes_; |
| 134 | +}; |
| 135 | +``` |
| 136 | +
|
| 137 | +##### Request Creation and Split Processing |
| 138 | +
|
| 139 | +The federation connector wraps the connector split with column information and forwards it as a Flight request: |
| 140 | +
|
| 141 | +```cpp |
| 142 | +void ArrowFederationDataSource::addSplit( |
| 143 | + std::shared_ptr<ConnectorSplit> split) { |
| 144 | + auto federationSplit = std::dynamic_pointer_cast<ArrowFederationSplit>(split); |
| 145 | + VELOX_CHECK( |
| 146 | + federationSplit, |
| 147 | + "ArrowFederationDataSource received wrong type of split"); |
| 148 | +
|
| 149 | + // Create request with split and column mapping |
| 150 | + nlohmann::json request = { |
| 151 | + {"split", federationSplit->splitBytes_}, |
| 152 | + {"columns", this->columnMapping_} |
| 153 | + }; |
| 154 | +
|
| 155 | + // Create FlightEndpoint with request as ticket |
| 156 | + arrow::flight::FlightEndpoint flightEndpoint; |
| 157 | + flightEndpoint.ticket = request.dump(); |
| 158 | +
|
| 159 | + std::string flightEndpointBytes; |
| 160 | + AFC_ASSIGN_OR_RAISE( |
| 161 | + flightEndpointBytes, |
| 162 | + flightEndpoint.SerializeToString()); |
| 163 | +
|
| 164 | + // Create ArrowFlightSplit and forward to parent class |
| 165 | + auto flightSplit = std::make_shared<ArrowFlightSplit>( |
| 166 | + connectorId, flightEndpointBytes); |
| 167 | +
|
| 168 | + // Let ArrowFlightDataSource handle the actual Flight communication |
| 169 | + ArrowFlightDataSource::addSplit(flightSplit); |
| 170 | +} |
| 171 | +``` |
| 172 | + |
| 173 | +The Arrow Federation connector leverages the existing Arrow Flight connector's infrastructure for communication with the Flight server, using the locally configured `arrow-flight.server` and `arrow-flight.server.port` properties. |
| 174 | + |
| 175 | +#### presto-flight-server (new) |
| 176 | +Flight server that loads connectors using Presto's plugin loading approach, adapted from the main Presto server implementation. |
| 177 | + |
| 178 | +The Flight server uses standard Presto configuration structure: |
| 179 | +- `jvm.config`: JVM heap size and flags (e.g., `-Xmx4G -XX:+UseG1GC`) |
| 180 | +- `config.properties`: Flight server configuration including: |
| 181 | + - `catalog.config-dir`: Path to catalog configurations (default: `etc/catalog`) |
| 182 | + - `plugin.dir`: Path to plugin directory |
| 183 | + - Other Flight server-specific properties |
| 184 | +- Catalog directory (as specified by `catalog.config-dir`): Connector configurations (jdbc.properties, mongodb.properties, etc.) |
| 185 | + |
| 186 | +```java |
| 187 | +public class FlightServerPluginManager { |
| 188 | + private final Map<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>(); |
| 189 | + private final Map<String, Connector> connectors = new ConcurrentHashMap<>(); |
| 190 | + |
| 191 | + public void loadPlugins() throws Exception { |
| 192 | + // Use same approach as PluginManager to load from installed plugins dir |
| 193 | + for (File pluginDir : listFiles(installedPluginsDir)) { |
| 194 | + loadPlugin(pluginDir); |
| 195 | + } |
| 196 | + } |
| 197 | + |
| 198 | + private void loadPlugin(File pluginDir) { |
| 199 | + URLClassLoader pluginClassLoader = buildClassLoader(pluginDir); |
| 200 | + ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader); |
| 201 | + |
| 202 | + for (Plugin plugin : serviceLoader) { |
| 203 | + for (ConnectorFactory factory : plugin.getConnectorFactories()) { |
| 204 | + log.info("Registering connector %s", factory.getName()); |
| 205 | + connectorFactories.put(factory.getName(), factory); |
| 206 | + } |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + public Connector getConnector(String connectorName) { |
| 211 | + // Create connector instances from factories as needed |
| 212 | + return connectors.computeIfAbsent(connectorName, name -> { |
| 213 | + ConnectorFactory factory = connectorFactories.get(name); |
| 214 | + return factory.create(name, config, connectorContext); |
| 215 | + }); |
| 216 | + } |
| 217 | +} |
| 218 | + |
| 219 | +public class ConnectorFlightProducer extends FlightProducer { |
| 220 | + private final FlightServerPluginManager pluginManager; |
| 221 | + |
| 222 | + @Override |
| 223 | + public void getStream(CallContext context, Ticket ticket, |
| 224 | + ServerStreamListener listener) { |
| 225 | + // Parse enhanced ticket with all execution context |
| 226 | + // Format: [version][connectorId_length][connectorId][params_length][params_serialized] |
| 227 | + String connectorId = extractConnectorId(ticket); |
| 228 | + PageSourceParameters params = extractParameters(ticket); |
| 229 | + |
| 230 | + // Get connector and deserialize all execution context |
| 231 | + Connector connector = pluginManager.getConnector(connectorId); |
| 232 | + ConnectorCodecProvider codecProvider = connector.getConnectorCodecProvider(); |
| 233 | + |
| 234 | + // Deserialize transaction handle |
| 235 | + ConnectorTransactionHandle transactionHandle = |
| 236 | + codecProvider.getTransactionHandleCodec() |
| 237 | + .deserialize(params.getTransactionHandle().toByteArray()); |
| 238 | + |
| 239 | + // Deserialize split |
| 240 | + ConnectorSplit split = codecProvider.getConnectorSplitCodec() |
| 241 | + .deserialize(params.getSplit().toByteArray()); |
| 242 | + |
| 243 | + // Deserialize column handles |
| 244 | + List<ColumnHandle> columns = deserializeColumns(params, codecProvider); |
| 245 | + |
| 246 | + // Reconstruct session from overrides (user, timezone, locale, query_id, etc.) |
| 247 | + ConnectorSession session = reconstructSession(params.getSessionOverrides(), connectorId); |
| 248 | + |
| 249 | + // Optional layout handle |
| 250 | + ConnectorTableLayoutHandle layout = deserializeLayout(params, codecProvider); |
| 251 | + |
| 252 | + // Split execution context |
| 253 | + SplitContext splitContext = new SplitContext(params.getSplitContextNanos()); |
| 254 | + |
| 255 | + // Create page source with complete execution context |
| 256 | + ConnectorPageSource pageSource = connector.getPageSourceProvider() |
| 257 | + .createPageSource( |
| 258 | + transactionHandle, |
| 259 | + session, |
| 260 | + split, |
| 261 | + layout, |
| 262 | + columns, |
| 263 | + splitContext, |
| 264 | + new RuntimeStats()); |
| 265 | + |
| 266 | + streamPagesAsArrow(pageSource, listener); |
| 267 | + } |
| 268 | +} |
| 269 | +``` |
| 270 | + |
| 271 | +### 3. Configuration |
| 272 | + |
| 273 | +#### Arrow Federation Connector Configuration |
| 274 | + |
| 275 | +The Arrow Federation connector is configured through catalog properties like any other connector. The connector ID (catalog name) must be identical between the coordinator and C++ worker, even though they use different connector implementations. Both must use the same catalog configuration filename (e.g., `mongodb.properties`), which determines the connector ID/catalog name. This ensures splits are routed to the correct connector on the worker and allows the Flight server to load the appropriate Java connector for execution. |
| 276 | + |
| 277 | +Example configuration: |
| 278 | +```properties |
| 279 | +# Coordinator: etc/catalog/mongodb.properties |
| 280 | +connector.name=mongodb |
| 281 | +mongodb.connection-url=mongodb://localhost:27017 |
| 282 | + |
| 283 | +# C++ Worker: etc/catalog/mongodb.properties (same filename) |
| 284 | +connector.name=arrow-federation |
| 285 | +arrow-flight.server=flight-server.internal |
| 286 | +arrow-flight.server.port=8815 |
| 287 | +``` |
| 288 | + |
| 289 | +The Flight server is managed externally as a separate service. |
| 290 | + |
| 291 | +### 4. Protocol Flow |
| 292 | + |
| 293 | +#### Split Scheduling Phase |
| 294 | +1. Coordinator's single-node connector (e.g., JDBC, MongoDB) creates its connector-specific split |
| 295 | +2. Split is scheduled to C++ worker as `ArrowFederationSplit` containing serialized split bytes |
| 296 | + |
| 297 | +#### Split Processing Phase |
| 298 | +1. C++ worker's Arrow Federation connector receives `ArrowFederationSplit` |
| 299 | +2. Federation connector creates Flight request containing: |
| 300 | + - Original split bytes |
| 301 | + - Column mapping information |
| 302 | +3. Request wrapped in FlightEndpoint and forwarded to parent ArrowFlightDataSource |
| 303 | + |
| 304 | +#### Execution Phase |
| 305 | +1. ArrowFlightDataSource sends request to configured Flight server |
| 306 | +2. Flight server deserializes split and executes using Java connector |
| 307 | +3. Results stream back as Arrow RecordBatches |
| 308 | + |
| 309 | +### 5. Dependencies and Prerequisites |
| 310 | + |
| 311 | +#### Arrow Flight Connector |
| 312 | + |
| 313 | +The Arrow Federation connector depends on the existing Arrow Flight connector infrastructure in presto-native-execution. It extends `ArrowFlightDataSource` to reuse the Flight communication mechanisms. |
| 314 | + |
| 315 | +#### Flight Request Format |
| 316 | + |
| 317 | +The federation connector creates a simple JSON request containing: |
| 318 | +- `split`: Base64-encoded serialized connector split (from any single-node connector) |
| 319 | +- `columns`: Column mapping information needed for execution |
| 320 | + |
| 321 | +The Flight server deserializes this request and executes the split using the appropriate Java connector. |
| 322 | + |
| 323 | +## Metrics |
| 324 | + |
| 325 | +* Flight protocol overhead (latency comparison between C++ worker and Flight server metrics) |
| 326 | +* Data transfer throughput between C++ worker and Flight server |
| 327 | + |
| 328 | +## Other Approaches Considered |
| 329 | + |
| 330 | +**JNI Integration**: Direct JVM embedding in C++ workers. Rejected due to memory management and debugging requirements. |
| 331 | + |
| 332 | +**Custom Protocol**: Thrift-based connector protocol. Arrow Flight chosen instead for columnar-native transport, existing ecosystem support, and potential for future cross-language connector implementations. |
| 333 | + |
| 334 | +## Adoption Plan |
| 335 | + |
| 336 | +* Configuration: |
| 337 | + - Configure catalogs to use `connector.name=arrow-federation` for federated execution |
| 338 | + - Set `arrow-flight.server` and `arrow-flight.server.port` to point to Flight server |
| 339 | + |
| 340 | +* Flight server deployment: |
| 341 | + - Deploy `presto-flight-server` as a separate service |
| 342 | + - Configure with standard Presto server configuration structure |
| 343 | + - Can be colocated with C++ workers or deployed separately |
| 344 | + |
| 345 | +* Migration: |
| 346 | + - Connectors can be migrated individually by creating federation catalog configurations |
| 347 | + - Documentation: Update connector documentation as federation support is added |
| 348 | + |
| 349 | +## Test Plan |
| 350 | + |
| 351 | +* Unit tests for split serialization and request creation |
| 352 | +* Integration tests for end-to-end query execution through Flight server |
| 353 | +* Performance tests comparing direct Java execution vs. federation through Flight |
0 commit comments