diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index fc91eb741..83521623b 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -173,16 +173,34 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { payload = maybeAddTimestamp(payload, record.timestamp()); // index + String routing = null; + if (config.getRoutingFieldNameConfig() != null) { + routing = getRoutingValueFromRecord(record); + } switch (config.writeMethod()) { case UPSERT: + // if (routing == null || !routing.isEmpty()) { + // return new UpdateRequest(index, id) + // .doc(payload, XContentType.JSON) + // .upsert(payload, XContentType.JSON) + // .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)); + // } return new UpdateRequest(index, id) .doc(payload, XContentType.JSON) .upsert(payload, XContentType.JSON) + .routing(routing) .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)); case INSERT: OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX; + // if (routing == null || !routing.isEmpty()) { + // return maybeAddExternalVersioning( + // new IndexRequest(index).id(id).source(payload, XContentType.JSON) + // .opType(opType), record + // ); + // } return maybeAddExternalVersioning( - new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType), + new IndexRequest(index).id(id).routing(routing) + .source(payload, XContentType.JSON).opType(opType), record ); default: @@ -190,6 +208,41 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { } } + private String getRoutingValueFromRecord(SinkRecord record) { + StringBuilder finalRoutingValueBuilder = new StringBuilder(); + for (String fieldName : config.getRoutingFieldNameConfig().split(",")) { + finalRoutingValueBuilder.append(getFieldValueFromRecord(record, fieldName)).append("_"); + } + if (finalRoutingValueBuilder.length() > 0) { + finalRoutingValueBuilder.deleteCharAt(finalRoutingValueBuilder.length() - 1); + } + return finalRoutingValueBuilder.toString(); + } + + private static Object getFieldValueFromRecord(Object structOrMap, String fieldName) { + Object field; + if (structOrMap instanceof Struct) { + field = ((Struct) structOrMap).get(fieldName); + } else if (structOrMap instanceof Map) { + field = ((Map) structOrMap).get(fieldName); + if (field == null) { + throw new DataException(String.format("Unable to find field '%s'", fieldName)); + } + return field; + } else { + throw new DataException(String.format( + "Argument not a Struct or Map. Cannot get field '%s' from %s.", + fieldName, + structOrMap + )); + } + if (field == null) { + throw new DataException( + String.format("The field '%s' does not exist in %s.", fieldName, structOrMap)); + } + return field; + } + private String getPayload(SinkRecord record) { if (record.value() == null) { return null; diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 51c7d4c85..82a689d02 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -382,6 +382,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String SSL_GROUP = "Security"; private static final String KERBEROS_GROUP = "Kerberos"; private static final String DATA_STREAM_GROUP = "Data Stream"; + private static final String ROUTING_FIELD_NAMES_CONFIG = "routing.field.names.config"; + private static final String ROUTING_FIELD_NAME_DOC = "Which fields should be used for routing"; public enum BehaviorOnMalformedDoc { IGNORE, @@ -719,6 +721,11 @@ private static void addConversionConfigs(ConfigDef configDef) { Width.SHORT, WRITE_METHOD_DISPLAY, new EnumRecommender<>(WriteMethod.class) + ).define( + ROUTING_FIELD_NAMES_CONFIG, + Type.STRING, + Importance.LOW, + ROUTING_FIELD_NAME_DOC ); } @@ -1034,6 +1041,10 @@ public long retryBackoffMs() { return getLong(RETRY_BACKOFF_MS_CONFIG); } + public String getRoutingFieldNameConfig() { + return getString(ROUTING_FIELD_NAMES_CONFIG); + } + private SecurityProtocol securityProtocol() { return SecurityProtocol.valueOf(getString(SECURITY_PROTOCOL_CONFIG).toUpperCase()); }