Skip to content

Add ScyllaDB connector #26055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ jobs:
!:trino-redis,
!:trino-redshift,
!:trino-resource-group-managers,
!:trino-scylladb,
!:trino-server-core,
!:trino-server,
!:trino-singlestore,
Expand Down Expand Up @@ -483,6 +484,7 @@ jobs:
- { modules: plugin/trino-redshift, profile: cloud-tests }
- { modules: plugin/trino-redshift, profile: fte-tests }
- { modules: plugin/trino-resource-group-managers }
- { modules: plugin/trino-scylladb }
- { modules: plugin/trino-singlestore }
- { modules: plugin/trino-snowflake }
- { modules: plugin/trino-snowflake, profile: cloud-tests }
Expand Down Expand Up @@ -896,6 +898,7 @@ jobs:
- suite-delta-lake-oss
- suite-kafka
- suite-cassandra
- suite-scylladb
- suite-clickhouse
- suite-mysql
- suite-iceberg
Expand Down
6 changes: 6 additions & 0 deletions core/trino-server/src/main/provisio/trino.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@
</artifact>
</artifactSet>

<artifactSet to="plugin/scylladb">
<artifact id="${project.groupId}:trino-scylladb:zip:${project.version}">
<unpack />
</artifact>
</artifactSet>

<artifactSet to="plugin/singlestore">
<artifact id="${project.groupId}:trino-singlestore:zip:${project.version}">
<unpack />
Expand Down
2 changes: 2 additions & 0 deletions docs/release-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@

## Redshift connector

## ScyllaDB connector

## SingleStore connector

## Snowflake connector
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ PostgreSQL <connector/postgresql>
Prometheus <connector/prometheus>
Redis <connector/redis>
Redshift <connector/redshift>
ScyllaDB <connector/scylladb>
SingleStore <connector/singlestore>
Snowflake <connector/snowflake>
SQL Server <connector/sqlserver>
Expand Down
37 changes: 37 additions & 0 deletions docs/src/main/sphinx/connector/scylladb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ScyllaDB connector

```{raw} html
<img src="../_static/img/cassandra.png" class="connector-logo">
```

The ScyllaDB connector allows querying data stored in
[ScyllaDB](https://www.scylladb.com/).

## Requirements

To connect to ScyllaDB, you need:

- ScyllaDB version 6.2 or higher.
- Network access from the Trino coordinator and workers to ScyllaDB.
Port 9042 is the default port.

## Configuration

To configure the ScyllaDB connector, create a catalog properties file
`etc/catalog/example.properties` with the following contents,
replacing `host1,host2` with a comma-separated list of the ScyllaDB
nodes, used to discover the cluster topology:

```text
connector.name=scylladb
cassandra.contact-points=host1,host2
```

You also need to set `cassandra.native-protocol-port`, if your
ScyllaDB nodes are not using the default port 9042.

## Compatibility with Cassandra connector

The ScyllaDB connector is very similar to the Cassandra connector with the
only difference being the underlying driver.
See [Cassandra connector](cassandra) for more details.
Binary file added docs/src/main/sphinx/static/img/scylladb.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public DistributedQueryRunner build()
public static void main(String[] args)
throws Exception
{
QueryRunner queryRunner = builder(new CassandraServer())
QueryRunner queryRunner = builder(new TestingCassandraServer())
.addCoordinatorProperty("http-server.http.port", "8080")
.setInitialTables(TpchTable.getTables())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,165 +13,17 @@
*/
package io.trino.plugin.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import org.testcontainers.cassandra.CassandraContainer;
import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_VERSION;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT;
import static com.google.common.io.Resources.getResource;
import static io.trino.plugin.cassandra.CassandraTestingUtils.CASSANDRA_TYPE_MANAGER;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.createDirectory;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.writeString;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testcontainers.utility.MountableFile.forHostPath;

public class CassandraServer
implements Closeable
public interface CassandraServer
extends Closeable
{
private static final Logger log = Logger.get(CassandraServer.class);
private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES);

private final CassandraContainer dockerContainer;
private final CassandraSession session;

public CassandraServer()
throws Exception
{
this("cassandra:3.0", "cu-cassandra.yaml");
}

public CassandraServer(String imageName, String configFileName)
throws Exception
{
this(DockerImageName.parse(imageName), ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName);
}

@SuppressWarnings("deprecation")
public CassandraServer(DockerImageName imageName, Map<String, String> environmentVariables, String configPath, String configFileName)
throws Exception
{
log.debug("Starting cassandra...");

this.dockerContainer = new CassandraContainer(imageName)
.withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath)
.withEnv(environmentVariables)
.withStartupTimeout(java.time.Duration.ofMinutes(10))
// TODO: https://github.com/testcontainers/testcontainers-java/issues/9337
.waitingFor(new CassandraQueryWaitStrategy());
this.dockerContainer.start();

ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder = DriverConfigLoader.programmaticBuilder();
driverConfigLoaderBuilder.withDuration(REQUEST_TIMEOUT, java.time.Duration.ofSeconds(30));
driverConfigLoaderBuilder.withString(PROTOCOL_VERSION, ProtocolVersion.V3.name());
driverConfigLoaderBuilder.withDuration(CONTROL_CONNECTION_AGREEMENT_TIMEOUT, java.time.Duration.ofSeconds(30));
// allow the retrieval of metadata for the system keyspaces
driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of());

CqlSessionBuilder cqlSessionBuilder = CqlSession.builder()
.addContactPoint(dockerContainer.getContactPoint())
.withLocalDatacenter(dockerContainer.getLocalDatacenter())
.withConfigLoader(driverConfigLoaderBuilder.build());

session = new CassandraSession(
CASSANDRA_TYPE_MANAGER,
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
cqlSessionBuilder::build,
new Duration(1, MINUTES));
}

private static String prepareCassandraYaml(String fileName)
throws IOException
{
String original = Resources.toString(getResource(fileName), UTF_8);

Path tmpDirPath = createTempDirectory(null);
Path dataDir = tmpDirPath.resolve("data");
createDirectory(dataDir);

String modified = original.replaceAll("\\$\\{data_directory\\}", dataDir.toAbsolutePath().toString());

File yamlFile = tmpDirPath.resolve(fileName).toFile();
yamlFile.deleteOnExit();
writeString(yamlFile.toPath(), modified, UTF_8);

return yamlFile.getAbsolutePath();
}

public CassandraSession getSession()
{
return session;
}

public String getHost()
{
return dockerContainer.getHost();
}

public int getPort()
{
return dockerContainer.getContactPoint().getPort();
}

public void refreshSizeEstimates(String keyspace, String table)
throws Exception
{
long deadline = System.nanoTime() + REFRESH_SIZE_ESTIMATES_TIMEOUT.roundTo(NANOSECONDS);
while (System.nanoTime() - deadline < 0) {
flushTable(keyspace, table);
refreshSizeEstimates();
List<SizeEstimate> sizeEstimates = getSession().getSizeEstimates(keyspace, table);
if (!sizeEstimates.isEmpty()) {
log.debug("Size estimates for the table %s.%s have been refreshed successfully: %s", keyspace, table, sizeEstimates);
return;
}
log.debug("Size estimates haven't been refreshed as expected. Retrying ...");
SECONDS.sleep(1);
}
throw new TimeoutException(format("Attempting to refresh size estimates for table %s.%s has timed out after %s", keyspace, table, REFRESH_SIZE_ESTIMATES_TIMEOUT));
}
CassandraSession getSession();

private void flushTable(String keyspace, String table)
throws Exception
{
dockerContainer.execInContainer("nodetool", "flush", keyspace, table);
}
String getHost();

private void refreshSizeEstimates()
throws Exception
{
dockerContainer.execInContainer("nodetool", "refreshsizeestimates");
}
int getPort();

@Override
public void close()
{
session.close();
dockerContainer.close();
}
void refreshSizeEstimates(String keyspace, String table)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Isolated;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
Expand Down Expand Up @@ -113,7 +114,7 @@ public class TestCassandraConnector
public void setup()
throws Exception
{
this.server = new CassandraServer();
this.server = new TestingCassandraServer();

String keyspace = "test_connector";
createTestTables(server.getSession(), keyspace, DATE);
Expand Down Expand Up @@ -146,6 +147,7 @@ public void setup()

@AfterAll
public void tearDown()
throws IOException
{
server.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
public class TestCassandraConnectorTest
extends BaseConnectorTest
{
private static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC"));
protected static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC"));

private CassandraSession session;
protected CassandraSession session;

@Override
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
Expand Down Expand Up @@ -111,7 +111,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer());
CassandraServer server = closeAfterClass(new TestingCassandraServer());
session = server.getSession();
return CassandraQueryRunner.builder(server)
.setInitialTables(REQUIRED_TPCH_TABLES)
Expand Down Expand Up @@ -1245,7 +1245,7 @@ void testPartitioningKeys()
}

@Test
void testSelectClusteringMaterializedView()
protected void testSelectClusteringMaterializedView()
{
try (TestCassandraTable table = testTable(
"test_clustering_materialized_view_base",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class TestCassandraLatestConnectorSmokeTest
protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer("cassandra:5.0.2", "cu-cassandra-latest.yaml"));
CassandraServer server = closeAfterClass(new TestingCassandraServer("cassandra:5.0.2", "cu-cassandra-latest.yaml"));
CassandraSession session = server.getSession();
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
return CassandraQueryRunner.builder(server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class TestCassandraProtocolVersionV3ConnectorSmokeTest
protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer());
CassandraServer server = closeAfterClass(new TestingCassandraServer());
CassandraSession session = server.getSession();
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
return CassandraQueryRunner.builder(server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class TestCassandraSplitManager
void setUp()
throws Exception
{
server = new CassandraServer();
server = new TestingCassandraServer();
session = server.getSession();
createKeyspace(session, KEYSPACE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public class TestCassandraTokenSplitManager
private static final String KEYSPACE = "test_cassandra_token_split_manager_keyspace";
private static final int PARTITION_COUNT = 1000;

private CassandraServer server;
private TestingCassandraServer server;
private CassandraSession session;
private CassandraTokenSplitManager splitManager;

@BeforeAll
public void setUp()
throws Exception
{
server = new CassandraServer();
server = new TestingCassandraServer();
session = server.getSession();
createKeyspace(session, KEYSPACE);
splitManager = new CassandraTokenSplitManager(session, SPLIT_SIZE, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
protected QueryRunner createQueryRunner()
throws Exception
{
server = closeAfterClass(new CassandraServer());
server = closeAfterClass(new TestingCassandraServer());
session = server.getSession();
return CassandraQueryRunner.builder(server)
.addConnectorProperties(ImmutableMap.<String, String>builder()
Expand Down
Loading
Loading