Skip to content

Commit

Permalink
Merge pull request #288 from ClickHouse/handle-null-arrays
Browse files Browse the repository at this point in the history
Send an empty array when field is null
  • Loading branch information
Paultagoras authored Jan 11, 2024
2 parents ec796ae + 30fb894 commit 04d3443
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.gson.reflect.TypeToken;

import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.connect.data.Field;
Expand All @@ -35,11 +36,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -354,8 +350,10 @@ private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream s
BinaryStreamUtils.writeNonNull(stream);
}
if (!col.isNullable() && value.getObject() == null) {
// this the situation when the col is not isNullable, but the data is null here we need to drop the records
throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", col.getName()));
if (colType == Type.ARRAY)
BinaryStreamUtils.writeNonNull(stream);
else
throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", col.getName()));
}
switch (colType) {
case INT8:
Expand Down Expand Up @@ -403,18 +401,23 @@ private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream s
break;
case ARRAY:
List<?> arrObject = (List<?>) value.getObject();
int sizeArrObject = arrObject.size();
BinaryStreamUtils.writeVarInt(stream, sizeArrObject);
arrObject.forEach(v -> {
try {
if (col.getSubType().isNullable() && v != null) {
BinaryStreamUtils.writeNonNull(stream);

if (arrObject == null) {
doWritePrimitive(colType, value.getFieldType(), stream, new ArrayList<>());
} else {
int sizeArrObject = arrObject.size();
BinaryStreamUtils.writeVarInt(stream, sizeArrObject);
arrObject.forEach(v -> {
try {
if (col.getSubType().isNullable() && v != null) {
BinaryStreamUtils.writeNonNull(stream);
}
doWritePrimitive(col.getSubType().getType(), value.getFieldType(), stream, v);
} catch (IOException e) {
throw new RuntimeException(e);
}
doWritePrimitive(col.getSubType().getType(), value.getFieldType(), stream, v);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
}
break;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void mapTypesTest() {
}

@Test
public void nullableArrayTypeTest() {
public void nullArrayTypeTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -131,8 +131,28 @@ public void nullableArrayTypeTest() {

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
assertThrowsExactly(RuntimeException.class, () -> chst.put(sr), "An attempt to write null into not nullable column 'arr'");
chst.put(sr);
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
public void nullableArrayTypeTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props);

String topic = "nullable_array_string_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(Nullable(String)) ) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = SchemaTestData.createNullableArrayType(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
Expand Down

0 comments on commit 04d3443

Please sign in to comment.