Skip to content

Commit 60744ec

Browse files
committed
Add ScyllaDB connector
1 parent e2157e8 commit 60744ec

File tree

33 files changed

+848
-227
lines changed

33 files changed

+848
-227
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ jobs:
365365
!:trino-redis,
366366
!:trino-redshift,
367367
!:trino-resource-group-managers,
368+
!:trino-scylladb,
368369
!:trino-server-core,
369370
!:trino-server,
370371
!:trino-singlestore,
@@ -483,6 +484,7 @@ jobs:
483484
- { modules: plugin/trino-redshift, profile: cloud-tests }
484485
- { modules: plugin/trino-redshift, profile: fte-tests }
485486
- { modules: plugin/trino-resource-group-managers }
487+
- { modules: plugin/trino-scylladb }
486488
- { modules: plugin/trino-singlestore }
487489
- { modules: plugin/trino-snowflake }
488490
- { modules: plugin/trino-snowflake, profile: cloud-tests }
@@ -896,6 +898,7 @@ jobs:
896898
- suite-delta-lake-oss
897899
- suite-kafka
898900
- suite-cassandra
901+
- suite-scylladb
899902
- suite-clickhouse
900903
- suite-mysql
901904
- suite-iceberg

core/trino-server/src/main/provisio/trino.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,12 @@
257257
</artifact>
258258
</artifactSet>
259259

260+
<artifactSet to="plugin/scylladb">
261+
<artifact id="${project.groupId}:trino-scylladb:zip:${project.version}">
262+
<unpack />
263+
</artifact>
264+
</artifactSet>
265+
260266
<artifactSet to="plugin/singlestore">
261267
<artifact id="${project.groupId}:trino-singlestore:zip:${project.version}">
262268
<unpack />

docs/release-template.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070

7171
## Redshift connector
7272

73+
## ScyllaDB connector
74+
7375
## SingleStore connector
7476

7577
## Snowflake connector

docs/src/main/sphinx/connector.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ PostgreSQL <connector/postgresql>
3737
Prometheus <connector/prometheus>
3838
Redis <connector/redis>
3939
Redshift <connector/redshift>
40+
ScyllaDB <connector/scylladb>
4041
SingleStore <connector/singlestore>
4142
Snowflake <connector/snowflake>
4243
SQL Server <connector/sqlserver>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# ScyllaDB connector
2+
3+
```{raw} html
4+
<img src="../_static/img/cassandra.png" class="connector-logo">
5+
```
6+
7+
The ScyllaDB connector allows querying data stored in
8+
[ScyllaDB](https://www.scylladb.com/).
9+
10+
## Requirements
11+
12+
To connect to ScyllaDB, you need:
13+
14+
- ScyllaDB version 6.2 or higher.
15+
- Network access from the Trino coordinator and workers to ScyllaDB.
16+
Port 9042 is the default port.
17+
18+
## Configuration
19+
20+
To configure the ScyllaDB connector, create a catalog properties file
21+
`etc/catalog/example.properties` with the following contents,
22+
replacing `host1,host2` with a comma-separated list of the ScyllaDB
23+
nodes, used to discover the cluster topology:
24+
25+
```text
26+
connector.name=scylladb
27+
cassandra.contact-points=host1,host2
28+
```
29+
30+
You also need to set `cassandra.native-protocol-port`, if your
31+
ScyllaDB nodes are not using the default port 9042.
32+
33+
## Compatibility with Cassandra connector
34+
35+
The ScyllaDB connector is very similar to the Cassandra connector with the
36+
only difference being the underlying driver.
37+
See [Cassandra connector](cassandra) for more details.
26 KB
Loading

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraQueryRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public DistributedQueryRunner build()
114114
public static void main(String[] args)
115115
throws Exception
116116
{
117-
QueryRunner queryRunner = builder(new CassandraServer())
117+
QueryRunner queryRunner = builder(new TestingCassandraServer())
118118
.addCoordinatorProperty("http-server.http.port", "8080")
119119
.setInitialTables(TpchTable.getTables())
120120
.build();

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java

Lines changed: 7 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -13,165 +13,17 @@
1313
*/
1414
package io.trino.plugin.cassandra;
1515

16-
import com.datastax.oss.driver.api.core.CqlSession;
17-
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
18-
import com.datastax.oss.driver.api.core.ProtocolVersion;
19-
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
20-
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
21-
import com.google.common.collect.ImmutableMap;
22-
import com.google.common.io.Resources;
23-
import io.airlift.json.JsonCodec;
24-
import io.airlift.log.Logger;
25-
import io.airlift.units.Duration;
26-
import org.testcontainers.cassandra.CassandraContainer;
27-
import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
28-
import org.testcontainers.utility.DockerImageName;
29-
3016
import java.io.Closeable;
31-
import java.io.File;
32-
import java.io.IOException;
33-
import java.nio.file.Path;
34-
import java.util.List;
35-
import java.util.Map;
36-
import java.util.concurrent.TimeoutException;
37-
38-
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT;
39-
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES;
40-
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_VERSION;
41-
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT;
42-
import static com.google.common.io.Resources.getResource;
43-
import static io.trino.plugin.cassandra.CassandraTestingUtils.CASSANDRA_TYPE_MANAGER;
44-
import static java.lang.String.format;
45-
import static java.nio.charset.StandardCharsets.UTF_8;
46-
import static java.nio.file.Files.createDirectory;
47-
import static java.nio.file.Files.createTempDirectory;
48-
import static java.nio.file.Files.writeString;
49-
import static java.util.concurrent.TimeUnit.MINUTES;
50-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
51-
import static java.util.concurrent.TimeUnit.SECONDS;
52-
import static org.testcontainers.utility.MountableFile.forHostPath;
5317

54-
public class CassandraServer
55-
implements Closeable
18+
public interface CassandraServer
19+
extends Closeable
5620
{
57-
private static final Logger log = Logger.get(CassandraServer.class);
58-
private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES);
59-
60-
private final CassandraContainer dockerContainer;
61-
private final CassandraSession session;
62-
63-
public CassandraServer()
64-
throws Exception
65-
{
66-
this("cassandra:3.0", "cu-cassandra.yaml");
67-
}
68-
69-
public CassandraServer(String imageName, String configFileName)
70-
throws Exception
71-
{
72-
this(DockerImageName.parse(imageName), ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName);
73-
}
74-
75-
@SuppressWarnings("deprecation")
76-
public CassandraServer(DockerImageName imageName, Map<String, String> environmentVariables, String configPath, String configFileName)
77-
throws Exception
78-
{
79-
log.debug("Starting cassandra...");
80-
81-
this.dockerContainer = new CassandraContainer(imageName)
82-
.withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath)
83-
.withEnv(environmentVariables)
84-
.withStartupTimeout(java.time.Duration.ofMinutes(10))
85-
// TODO: https://github.com/testcontainers/testcontainers-java/issues/9337
86-
.waitingFor(new CassandraQueryWaitStrategy());
87-
this.dockerContainer.start();
88-
89-
ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder = DriverConfigLoader.programmaticBuilder();
90-
driverConfigLoaderBuilder.withDuration(REQUEST_TIMEOUT, java.time.Duration.ofSeconds(30));
91-
driverConfigLoaderBuilder.withString(PROTOCOL_VERSION, ProtocolVersion.V3.name());
92-
driverConfigLoaderBuilder.withDuration(CONTROL_CONNECTION_AGREEMENT_TIMEOUT, java.time.Duration.ofSeconds(30));
93-
// allow the retrieval of metadata for the system keyspaces
94-
driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of());
95-
96-
CqlSessionBuilder cqlSessionBuilder = CqlSession.builder()
97-
.addContactPoint(dockerContainer.getContactPoint())
98-
.withLocalDatacenter(dockerContainer.getLocalDatacenter())
99-
.withConfigLoader(driverConfigLoaderBuilder.build());
100-
101-
session = new CassandraSession(
102-
CASSANDRA_TYPE_MANAGER,
103-
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
104-
cqlSessionBuilder::build,
105-
new Duration(1, MINUTES));
106-
}
107-
108-
private static String prepareCassandraYaml(String fileName)
109-
throws IOException
110-
{
111-
String original = Resources.toString(getResource(fileName), UTF_8);
112-
113-
Path tmpDirPath = createTempDirectory(null);
114-
Path dataDir = tmpDirPath.resolve("data");
115-
createDirectory(dataDir);
116-
117-
String modified = original.replaceAll("\\$\\{data_directory\\}", dataDir.toAbsolutePath().toString());
118-
119-
File yamlFile = tmpDirPath.resolve(fileName).toFile();
120-
yamlFile.deleteOnExit();
121-
writeString(yamlFile.toPath(), modified, UTF_8);
122-
123-
return yamlFile.getAbsolutePath();
124-
}
125-
126-
public CassandraSession getSession()
127-
{
128-
return session;
129-
}
130-
131-
public String getHost()
132-
{
133-
return dockerContainer.getHost();
134-
}
135-
136-
public int getPort()
137-
{
138-
return dockerContainer.getContactPoint().getPort();
139-
}
140-
141-
public void refreshSizeEstimates(String keyspace, String table)
142-
throws Exception
143-
{
144-
long deadline = System.nanoTime() + REFRESH_SIZE_ESTIMATES_TIMEOUT.roundTo(NANOSECONDS);
145-
while (System.nanoTime() - deadline < 0) {
146-
flushTable(keyspace, table);
147-
refreshSizeEstimates();
148-
List<SizeEstimate> sizeEstimates = getSession().getSizeEstimates(keyspace, table);
149-
if (!sizeEstimates.isEmpty()) {
150-
log.debug("Size estimates for the table %s.%s have been refreshed successfully: %s", keyspace, table, sizeEstimates);
151-
return;
152-
}
153-
log.debug("Size estimates haven't been refreshed as expected. Retrying ...");
154-
SECONDS.sleep(1);
155-
}
156-
throw new TimeoutException(format("Attempting to refresh size estimates for table %s.%s has timed out after %s", keyspace, table, REFRESH_SIZE_ESTIMATES_TIMEOUT));
157-
}
21+
CassandraSession getSession();
15822

159-
private void flushTable(String keyspace, String table)
160-
throws Exception
161-
{
162-
dockerContainer.execInContainer("nodetool", "flush", keyspace, table);
163-
}
23+
String getHost();
16424

165-
private void refreshSizeEstimates()
166-
throws Exception
167-
{
168-
dockerContainer.execInContainer("nodetool", "refreshsizeestimates");
169-
}
25+
int getPort();
17026

171-
@Override
172-
public void close()
173-
{
174-
session.close();
175-
dockerContainer.close();
176-
}
27+
void refreshSizeEstimates(String keyspace, String table)
28+
throws Exception;
17729
}

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.junit.jupiter.api.TestInstance;
5555
import org.junit.jupiter.api.parallel.Isolated;
5656

57+
import java.io.IOException;
5758
import java.net.InetAddress;
5859
import java.net.UnknownHostException;
5960
import java.util.Date;
@@ -113,7 +114,7 @@ public class TestCassandraConnector
113114
public void setup()
114115
throws Exception
115116
{
116-
this.server = new CassandraServer();
117+
this.server = new TestingCassandraServer();
117118

118119
String keyspace = "test_connector";
119120
createTestTables(server.getSession(), keyspace, DATE);
@@ -146,6 +147,7 @@ public void setup()
146147

147148
@AfterAll
148149
public void tearDown()
150+
throws IOException
149151
{
150152
server.close();
151153
}

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@
7777
public class TestCassandraConnectorTest
7878
extends BaseConnectorTest
7979
{
80-
private static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC"));
80+
protected static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC"));
8181

82-
private CassandraSession session;
82+
protected CassandraSession session;
8383

8484
@Override
8585
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
@@ -111,7 +111,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
111111
protected QueryRunner createQueryRunner()
112112
throws Exception
113113
{
114-
CassandraServer server = closeAfterClass(new CassandraServer());
114+
CassandraServer server = closeAfterClass(new TestingCassandraServer());
115115
session = server.getSession();
116116
return CassandraQueryRunner.builder(server)
117117
.setInitialTables(REQUIRED_TPCH_TABLES)
@@ -1245,7 +1245,7 @@ void testPartitioningKeys()
12451245
}
12461246

12471247
@Test
1248-
void testSelectClusteringMaterializedView()
1248+
protected void testSelectClusteringMaterializedView()
12491249
{
12501250
try (TestCassandraTable table = testTable(
12511251
"test_clustering_materialized_view_base",

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraLatestConnectorSmokeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class TestCassandraLatestConnectorSmokeTest
2626
protected QueryRunner createQueryRunner()
2727
throws Exception
2828
{
29-
CassandraServer server = closeAfterClass(new CassandraServer("cassandra:5.0.2", "cu-cassandra-latest.yaml"));
29+
CassandraServer server = closeAfterClass(new TestingCassandraServer("cassandra:5.0.2", "cu-cassandra-latest.yaml"));
3030
CassandraSession session = server.getSession();
3131
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
3232
return CassandraQueryRunner.builder(server)

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraProtocolVersionV3ConnectorSmokeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class TestCassandraProtocolVersionV3ConnectorSmokeTest
3030
protected QueryRunner createQueryRunner()
3131
throws Exception
3232
{
33-
CassandraServer server = closeAfterClass(new CassandraServer());
33+
CassandraServer server = closeAfterClass(new TestingCassandraServer());
3434
CassandraSession session = server.getSession();
3535
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
3636
return CassandraQueryRunner.builder(server)

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSplitManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ final class TestCassandraSplitManager
5050
void setUp()
5151
throws Exception
5252
{
53-
server = new CassandraServer();
53+
server = new TestingCassandraServer();
5454
session = server.getSession();
5555
createKeyspace(session, KEYSPACE);
5656
}

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTokenSplitManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ public class TestCassandraTokenSplitManager
3737
private static final String KEYSPACE = "test_cassandra_token_split_manager_keyspace";
3838
private static final int PARTITION_COUNT = 1000;
3939

40-
private CassandraServer server;
40+
private TestingCassandraServer server;
4141
private CassandraSession session;
4242
private CassandraTokenSplitManager splitManager;
4343

4444
@BeforeAll
4545
public void setUp()
4646
throws Exception
4747
{
48-
server = new CassandraServer();
48+
server = new TestingCassandraServer();
4949
session = server.getSession();
5050
createKeyspace(session, KEYSPACE);
5151
splitManager = new CassandraTokenSplitManager(session, SPLIT_SIZE, Optional.empty());

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
138138
protected QueryRunner createQueryRunner()
139139
throws Exception
140140
{
141-
server = closeAfterClass(new CassandraServer());
141+
server = closeAfterClass(new TestingCassandraServer());
142142
session = server.getSession();
143143
return CassandraQueryRunner.builder(server)
144144
.addConnectorProperties(ImmutableMap.<String, String>builder()

0 commit comments

Comments
 (0)