Skip to content

Commit eced569

Browse files
committed
Add RFC-0013 for Java connector federation in C++ workers
1 parent 9ec4989 commit eced569

File tree

1 file changed

+325
-0
lines changed

1 file changed

+325
-0
lines changed
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
# **RFC-0013 for Java Connector Federation in C++ Workers**
2+
3+
## Java Connector Federation via Arrow Flight
4+
5+
Proposers
6+
7+
* Tim Meehan
8+
* Bryan Cutler
9+
* Pratik Dabre
10+
11+
## Related Issues
12+
13+
* [PR #25242](https://github.com/prestodb/presto/pull/25242): Custom Serialization Framework (ConnectorCodecProvider)
14+
* Arrow Flight Connector in presto-native-execution
15+
16+
## Summary
17+
18+
Enable C++ workers to execute queries using existing single-connection Java connector implementations through Arrow Flight RPC, providing a migration path for connectors not yet ported to C++.
19+
20+
## Background
21+
22+
Presto's C++ workers provide performance improvements but require connectors to be rewritten in C++. Many connectors (JDBC, MongoDB, Elasticsearch) are single-connection systems where the data transfer, not computation, is the bottleneck. Rewriting these connectors in C++ does not improve performance, yet they require a complete reimplementation.
23+
24+
The recent addition of `ConnectorCodecProvider` ([PR #25242](https://github.com/prestodb/presto/pull/25242)) enables custom serialization of connector data structures. Combined with the existing Arrow Flight connector in C++, this provides a path to federation.
25+
26+
### Goals
27+
28+
* Enable single-connection Java connectors to work with C++ workers without code duplication
29+
* Optimize for connectors where all splits share a single database connection
30+
31+
### Non-goals
32+
33+
* Multi-split connectors (Hive, Iceberg) where parallelism and C++ execution improve performance
34+
* Modifying the coordinator-worker protocol
35+
36+
## Proposed Implementation
37+
38+
### 1. Architecture Overview
39+
40+
```mermaid
41+
graph LR
42+
subgraph "Coordinator (Java)"
43+
A[JDBC Connector<br/>generates split]
44+
end
45+
46+
subgraph "C++ Worker"
47+
B[Arrow Flight<br/>Connector<br/>forwards ticket]
48+
end
49+
50+
subgraph "Flight Server (Java)"
51+
C[JDBC Connector<br/>execute split]
52+
end
53+
54+
A -->|ArrowSplit<br/>with ticket| B
55+
B -->|gRPC/Flight<br/>protocol| C
56+
C -->|Arrow<br/>RecordBatches| B
57+
```
58+
59+
### 2. Module Changes
60+
61+
#### presto-connector-flight-common (new)
62+
Common library providing Arrow Flight wrapping (example pseudocode):
63+
64+
```java
65+
// One-line integration for any connector
66+
public class ArrowFlightCodecProvider implements ConnectorCodecProvider {
67+
public static ConnectorCodecProvider wrap(
68+
ConnectorCodecProvider original,
69+
ArrowFlightFederationConfig config) {
70+
if (!config.isEnabled()) {
71+
return original;
72+
}
73+
return new ArrowFlightCodecProvider(original, config);
74+
}
75+
76+
public Optional<ConnectorCodec<ConnectorSplit>> getConnectorSplitCodec() {
77+
return Optional.of(new ArrowFlightWrapperCodec(
78+
originalCodec,
79+
connectorId));
80+
}
81+
}
82+
83+
public class ArrowFlightWrapperCodec implements ConnectorCodec<ConnectorSplit> {
84+
private final String connectorId;
85+
86+
public byte[] serialize(ConnectorSplit split) {
87+
byte[] originalBytes = originalCodec.serialize(split);
88+
89+
// Create ticket with connector ID prefix to identify the connector
90+
ByteBuffer ticketBuffer = ByteBuffer.allocate(
91+
4 + connectorId.length() + originalBytes.length);
92+
ticketBuffer.putInt(connectorId.length());
93+
ticketBuffer.put(connectorId.getBytes(UTF_8));
94+
ticketBuffer.put(originalBytes);
95+
96+
// No locations added - C++ worker will use locally configured Flight server
97+
FlightEndpoint.Builder endpoint = FlightEndpoint.newBuilder()
98+
.setTicket(FlightTicket.newBuilder()
99+
.setTicket(ByteString.copyFrom(ticketBuffer.array())));
100+
101+
ArrowSplit arrowSplit = new ArrowSplit();
102+
arrowSplit.setFlightEndpointBytes(Base64.encode(endpoint.build().toByteArray()));
103+
return arrowFlightCodec.serialize(arrowSplit);
104+
}
105+
}
106+
107+
```
108+
109+
#### Integration for any single-connection Connector
110+
```java
111+
// JDBC Connector
112+
public class JdbcConnector implements Connector {
113+
@Inject ArrowFlightFederationConfig flightConfig;
114+
115+
@Override
116+
public ConnectorCodecProvider getConnectorCodecProvider() {
117+
// Enable federation
118+
return ArrowFlightCodecProvider.wrap(
119+
getDefaultJdbcCodecProvider(),
120+
flightConfig);
121+
}
122+
}
123+
124+
// MongoDB Connector - identical pattern
125+
public class MongoConnector implements Connector {
126+
@Inject ArrowFlightFederationConfig flightConfig;
127+
128+
@Override
129+
public ConnectorCodecProvider getConnectorCodecProvider() {
130+
return ArrowFlightCodecProvider.wrap(
131+
getDefaultMongoCodecProvider(),
132+
flightConfig);
133+
}
134+
}
135+
```
136+
137+
#### presto-native-execution changes
138+
139+
##### Custom Serialization Support
140+
141+
Override the `deserialize` method to parse the binary format that matches the Java ArrowSplit codec's `serialize` method:
142+
143+
```cpp
144+
class ArrowConnectorProtocol : public ConnectorProtocolTemplate<
145+
ArrowTableHandle,
146+
ArrowTableLayoutHandle,
147+
ArrowColumnHandle,
148+
NotImplemented,
149+
NotImplemented,
150+
ArrowSplit,
151+
NotImplemented,
152+
ArrowTransactionHandle,
153+
NotImplemented,
154+
NotImplemented> {
155+
public:
156+
void deserialize(
157+
const std::string& thrift,
158+
std::shared_ptr<ConnectorSplit>& proto) const override {
159+
// Parse binary format matching Java's ArrowSplit codec serialize method
160+
auto arrowSplit = std::make_shared<ArrowSplit>();
161+
// Binary format: [schemaName_len][schemaName][tableName_len][tableName][flightEndpointBytes]
162+
// Implementation details...
163+
proto = arrowSplit;
164+
}
165+
};
166+
```
167+
168+
The deserialization path in `ProtocolToThrift.cpp` already calls `getConnectorProtocol(connectorId).deserialize()`.
169+
170+
The existing Arrow Flight connector already uses the locally configured `arrow-flight.server` and `arrow-flight.server.port` properties. When no locations are provided in the FlightEndpoint, it automatically falls back to these configured values.
171+
172+
##### Colocated Flight Server Process Management
173+
174+
When `arrow-flight.colocated=true`, the C++ worker manages a Flight server subprocess:
175+
176+
1. **On startup**:
177+
- Automatically configures `arrow-flight.server=localhost` and uses `arrow-flight.colocated.port`
178+
- Spawns a Java subprocess running `presto-flight-server` with JVM options from `${etc.dir}/jvm.config`
179+
- Restarts the Flight server if it crashes
180+
- Terminates the subprocess on worker shutdown
181+
182+
2. **Flight server subprocess**:
183+
- Uses standard Presto configuration structure from the etc directory:
184+
- `jvm.config`: JVM heap size and flags (e.g., `-Xmx4G -XX:+UseG1GC`)
185+
- `config.properties`: Flight server configuration including:
186+
- `catalog.config-dir`: Path to catalog configurations (default: `etc/catalog`)
187+
- `plugin.dir`: Path to plugin directory
188+
- Other Flight server-specific properties
189+
- Catalog directory (as specified by `catalog.config-dir`): Connector configurations (jdbc.properties, mongodb.properties, etc.)
190+
- Binds to the port specified by `arrow-flight.colocated.port` on localhost
191+
192+
#### presto-flight-server (new)
193+
Flight server that loads connectors via standard Presto plugin mechanism:
194+
195+
```java
196+
public class FlightServerPluginManager {
197+
private final Map<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
198+
private final Map<String, Connector> connectors = new ConcurrentHashMap<>();
199+
200+
public void loadPlugins() throws Exception {
201+
// Use same mechanism as PluginManager to load from installed plugins dir
202+
for (File pluginDir : listFiles(installedPluginsDir)) {
203+
loadPlugin(pluginDir);
204+
}
205+
}
206+
207+
private void loadPlugin(File pluginDir) {
208+
URLClassLoader pluginClassLoader = buildClassLoader(pluginDir);
209+
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
210+
211+
for (Plugin plugin : serviceLoader) {
212+
for (ConnectorFactory factory : plugin.getConnectorFactories()) {
213+
log.info("Registering connector %s", factory.getName());
214+
connectorFactories.put(factory.getName(), factory);
215+
}
216+
}
217+
}
218+
219+
public Connector getConnector(String connectorName) {
220+
// Create connector instances from factories as needed
221+
return connectors.computeIfAbsent(connectorName, name -> {
222+
ConnectorFactory factory = connectorFactories.get(name);
223+
return factory.create(name, config, connectorContext);
224+
});
225+
}
226+
}
227+
228+
public class ConnectorFlightServer extends FlightProducer {
229+
private final FlightServerPluginManager pluginManager;
230+
231+
@Override
232+
public void getStream(CallContext context, Ticket ticket,
233+
ServerStreamListener listener) {
234+
// Ticket format: [connectorId_length][connectorId][split_bytes]
235+
ByteBuffer buffer = ByteBuffer.wrap(ticket.getBytes());
236+
int connectorIdLength = buffer.getInt();
237+
byte[] connectorIdBytes = new byte[connectorIdLength];
238+
buffer.get(connectorIdBytes);
239+
String connectorId = new String(connectorIdBytes, UTF_8);
240+
241+
// Get connector from plugin manager
242+
Connector connector = pluginManager.getConnector(connectorId);
243+
ConnectorCodec<ConnectorSplit> codec = connector.getConnectorCodecProvider()
244+
.getConnectorSplitCodec()
245+
.orElse(defaultJsonCodec);
246+
247+
// Deserialize and execute
248+
byte[] splitBytes = new byte[buffer.remaining()];
249+
buffer.get(splitBytes);
250+
ConnectorSplit split = codec.deserialize(splitBytes);
251+
252+
ConnectorPageSource pageSource = connector.getPageSource(split);
253+
streamPagesAsArrow(pageSource, listener);
254+
}
255+
}
256+
```
257+
258+
### 3. Configuration
259+
260+
#### Colocated Deployment Mode
261+
262+
```properties
263+
# C++ worker configuration
264+
arrow-flight.colocated=true
265+
arrow-flight.colocated.port=8815 # Port for the colocated Flight server
266+
# arrow-flight.server is automatically set to localhost
267+
# Setting arrow-flight.server is an error when colocated=true
268+
269+
# Flight server configuration directory
270+
arrow-flight.federation.etc.dir=/path/to/flight-server/etc
271+
```
272+
273+
The Flight server etc directory contains standard Presto configuration files:
274+
- `jvm.config`: JVM options for the Flight server process
275+
- `config.properties`: Flight server configuration (catalog.config-dir, plugin.dir, etc.)
276+
- Catalog directory: Connector configurations (jdbc.properties, mongodb.properties, etc.)
277+
278+
#### Remote Deployment Mode
279+
280+
```properties
281+
# C++ worker configuration
282+
arrow-flight.colocated=false # Default
283+
arrow-flight.server=flight-server.internal # Should be set for federation
284+
arrow-flight.server.port=8815 # Should be set for federation
285+
```
286+
287+
When `arrow-flight.colocated=true`, setting `arrow-flight.server` throws a configuration error since the server is automatically set to localhost and the port is configured via `arrow-flight.colocated.port`. For this federation architecture to work when `colocated=false`, both `arrow-flight.server` and `arrow-flight.server.port` should be configured since the FlightEndpoints sent by the coordinator do not include locations.
288+
289+
### 4. Protocol Flow
290+
291+
1. Coordinator's JDBC connector creates `JdbcSplit`
292+
2. `ArrowFlightWrapperCodec` serializes as `ArrowSplit` with embedded ticket (no locations)
293+
3. C++ worker receives `ArrowSplit` via custom serialization protocol
294+
4. Arrow Flight connector extracts ticket, sends to locally configured Flight server
295+
5. Flight server deserializes ticket as `JdbcSplit`, executes query
296+
6. Results stream back as Arrow RecordBatches
297+
298+
## Metrics
299+
300+
* Flight protocol overhead (latency comparison between C++ worker and Flight server metrics)
301+
* Data transfer throughput between C++ worker and Flight server
302+
303+
## Other Approaches Considered
304+
305+
**JNI Integration**: Direct JVM embedding in C++ workers. Rejected due to memory management and debugging requirements.
306+
307+
**Custom Protocol**: Thrift-based connector protocol. Arrow Flight chosen instead for columnar-native transport, existing ecosystem support, and potential for future cross-language connector implementations.
308+
309+
## Adoption Plan
310+
311+
* New configuration properties:
312+
- Per-connector: `arrow-flight-federation.enabled`
313+
- Per C++ worker:
314+
- `arrow-flight.colocated`: Whether to spawn local Flight server subprocess
315+
- `arrow-flight.colocated.port`: Port for colocated Flight server (used when colocated=true)
316+
- `arrow-flight.server` and `arrow-flight.server.port`: Should be set when colocated=false for federation to work
317+
318+
* Single-connection connectors can be migrated individually
319+
* Documentation: Update each connector's documentation as it gains Flight support
320+
321+
## Test Plan
322+
323+
* Unit tests for split serialization
324+
* Integration tests for end-to-end query execution through Flight server
325+
* Tests for both colocated and remote Flight server configurations

0 commit comments

Comments
 (0)