Skip to content

Added routing func for elastic sink in kafka connect #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,76 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
payload = maybeAddTimestamp(payload, record.timestamp());

// index
String routing = null;
if (config.getRoutingFieldNameConfig() != null) {
routing = getRoutingValueFromRecord(record);
}
switch (config.writeMethod()) {
case UPSERT:
// if (routing == null || !routing.isEmpty()) {
// return new UpdateRequest(index, id)
// .doc(payload, XContentType.JSON)
// .upsert(payload, XContentType.JSON)
// .retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
// }
return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.routing(routing)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
case INSERT:
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
// if (routing == null || !routing.isEmpty()) {
// return maybeAddExternalVersioning(
// new IndexRequest(index).id(id).source(payload, XContentType.JSON)
// .opType(opType), record
// );
// }
return maybeAddExternalVersioning(
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
new IndexRequest(index).id(id).routing(routing)
.source(payload, XContentType.JSON).opType(opType),
record
);
default:
return null; // shouldn't happen
}
}

private String getRoutingValueFromRecord(SinkRecord record) {
StringBuilder finalRoutingValueBuilder = new StringBuilder();
for (String fieldName : config.getRoutingFieldNameConfig().split(",")) {
finalRoutingValueBuilder.append(getFieldValueFromRecord(record, fieldName)).append("_");
}
if (finalRoutingValueBuilder.length() > 0) {
finalRoutingValueBuilder.deleteCharAt(finalRoutingValueBuilder.length() - 1);
}
return finalRoutingValueBuilder.toString();
}

private static Object getFieldValueFromRecord(Object structOrMap, String fieldName) {
Object field;
if (structOrMap instanceof Struct) {
field = ((Struct) structOrMap).get(fieldName);
} else if (structOrMap instanceof Map) {
field = ((Map<?, ?>) structOrMap).get(fieldName);
if (field == null) {
throw new DataException(String.format("Unable to find field '%s'", fieldName));
}
return field;
} else {
throw new DataException(String.format(
"Argument not a Struct or Map. Cannot get field '%s' from %s.",
fieldName,
structOrMap
));
}
if (field == null) {
throw new DataException(
String.format("The field '%s' does not exist in %s.", fieldName, structOrMap));
}
return field;
}

private String getPayload(SinkRecord record) {
if (record.value() == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String SSL_GROUP = "Security";
private static final String KERBEROS_GROUP = "Kerberos";
private static final String DATA_STREAM_GROUP = "Data Stream";
private static final String ROUTING_FIELD_NAMES_CONFIG = "routing.field.names.config";
private static final String ROUTING_FIELD_NAME_DOC = "Which fields should be used for routing";

public enum BehaviorOnMalformedDoc {
IGNORE,
Expand Down Expand Up @@ -719,6 +721,11 @@ private static void addConversionConfigs(ConfigDef configDef) {
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
).define(
ROUTING_FIELD_NAMES_CONFIG,
Type.STRING,
Importance.LOW,
ROUTING_FIELD_NAME_DOC
);
}

Expand Down Expand Up @@ -1034,6 +1041,10 @@ public long retryBackoffMs() {
return getLong(RETRY_BACKOFF_MS_CONFIG);
}

public String getRoutingFieldNameConfig() {
return getString(ROUTING_FIELD_NAMES_CONFIG);
}

private SecurityProtocol securityProtocol() {
return SecurityProtocol.valueOf(getString(SECURITY_PROTOCOL_CONFIG).toUpperCase());
}
Expand Down