Skip to content

Commit

Permalink
Updating because some instances require specific table definition
Browse files Browse the repository at this point in the history
  • Loading branch information
Paultagoras committed Jan 15, 2024
1 parent 81e9b91 commit 91a9156
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform;
import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.Network;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;

import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.createTable;
import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.dropTable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ClickHouseCloudTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void setupConnector(String topicName, int taskCount) throws IOException,
LOGGER.info("Setting up connector...");
confluentPlatform.deleteConnectors(SINK_CONNECTOR_NAME);
dropTable(chcNoProxy, topicName);
createTable(chcNoProxy, topicName);
createMergeTreeTable(chcNoProxy, topicName);

String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink.json")));
String jsonString = String.format(payloadClickHouseSink, SINK_CONNECTOR_NAME, SINK_CONNECTOR_NAME, taskCount, topicName,
Expand All @@ -120,7 +120,7 @@ private void setupConnector(String topicName, int taskCount) throws IOException,
private void setupSchemalessConnector(String topicName, int taskCount) throws IOException, InterruptedException {
LOGGER.info("Setting up schemaless connector...");
dropTable(chcNoProxy, topicName);
createTable(chcNoProxy, topicName);
createMergeTreeTable(chcNoProxy, topicName);

String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink_schemaless.json")));
String jsonString = String.format(payloadClickHouseSink, SINK_CONNECTOR_NAME, SINK_CONNECTOR_NAME, taskCount, topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;

import java.io.*;
Expand All @@ -18,10 +17,8 @@
import java.nio.file.Paths;
import java.util.*;

import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.createTable;
import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.dropTable;
import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.*;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;


public class ExactlyOnceTest {
Expand Down Expand Up @@ -73,7 +70,7 @@ private static void setupSchemalessConnector(String topicName, int taskCount) th
private static void setupConnector(String fileName, String topicName, int taskCount) throws IOException {
System.out.println("Setting up connector...");
dropTable(chcNoProxy, topicName);
createTable(chcNoProxy, topicName);
createReplicatedMergeTreeTable(chcNoProxy, topicName);

String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get(fileName)));
String jsonString = String.format(payloadClickHouseSink, SINK_CONNECTOR_NAME, SINK_CONNECTOR_NAME, taskCount, topicName,
Expand Down Expand Up @@ -107,7 +104,7 @@ public void beforeEach() throws IOException {


private boolean compareSchemalessCounts(String topicName, int partitions) throws InterruptedException, IOException {
ClickHouseAPI.createTable(chcNoProxy, topicName);
createReplicatedMergeTreeTable(chcNoProxy, topicName);
ClickHouseAPI.clearTable(chcNoProxy, topicName);
confluentPlatform.createTopic(topicName, partitions);
int count = generateSchemalessData(topicName, partitions, 250);
Expand Down Expand Up @@ -137,7 +134,7 @@ private void checkSpottyNetworkSchemaless(String topicName, int numberOfPartitio
do {
LOGGER.info("Run: {}", runCount);
confluentPlatform.createTopic(topicName, numberOfPartitions);
ClickHouseAPI.createTable(chcNoProxy, topicName);
createReplicatedMergeTreeTable(chcNoProxy, topicName);
ClickHouseAPI.clearTable(chcNoProxy, topicName);

int count = generateSchemalessData(topicName, numberOfPartitions, 1500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,22 @@ public static void dropTable(ClickHouseHelperClient chc, String tableName) {
}
}

public static void createTable(ClickHouseHelperClient chc, String tableName) {
public static void createMergeTreeTable(ClickHouseHelperClient chc, String tableName) {
String queryString = String.format("CREATE TABLE IF NOT EXISTS %s ( `side` String, `quantity` Int32, `symbol` String, `price` Int32, `account` String, `userid` String, `insertTime` DateTime DEFAULT now() ) " +
"Engine = MergeTree ORDER BY symbol", tableName);
try(ClickHouseResponse clickHouseResponse = chc.query(queryString)) {
LOGGER.info("Create: {}", clickHouseResponse.getSummary());
}
}

public static void createReplicatedMergeTreeTable(ClickHouseHelperClient chc, String tableName) {
String queryString = String.format("CREATE TABLE IF NOT EXISTS %s ( `side` String, `quantity` Int32, `symbol` String, `price` Int32, `account` String, `userid` String, `insertTime` DateTime DEFAULT now() ) " +
"Engine = ReplicatedMergeTree ORDER BY symbol", tableName);
try(ClickHouseResponse clickHouseResponse = chc.query(queryString)) {
LOGGER.info("Create: {}", clickHouseResponse.getSummary());
}
}

public static Iterable<ClickHouseRecord> selectDuplicates(ClickHouseHelperClient chc, String tableName) {
String queryString = String.format("SELECT `side`, `quantity`, `symbol`, `price`, `account`, `userid`, `insertTime`, COUNT(*) " +
"FROM %s " +
Expand Down

0 comments on commit 91a9156

Please sign in to comment.