Skip to content

Support scripted upsert using Elastic painless lang #484

@ndealmeida

Description

@ndealmeida

Elastic supports scripts (using painless language) on update api, it would be really nice to be able to use that feature on kafka-connect-elasticsearch. In my team we cloned this repo and we are successfully added this feature and we would like to contribute back to the open source project.

Proposal

Ideally we should be able to add this feature without introducing any breaking changes.

  1. Add a new write method: SCRIPTED_UPSERT
public enum WriteMethod {
    INSERT,
    UPSERT,
    SCRIPTED_UPSERT
}
case SCRIPTED_UPSERT:
        return new UpdateRequest(index, id)
                .script(new Script(DEFAULT_SCRIPT_TYPE, DEFAULT_SCRIPT_LANG,
                        config.scriptedUpsertSource(),
                        getRecordMap(record)
                ))
                .scriptedUpsert(true)
                .upsert(Collections.emptyMap())
                .retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
  1. Add a new config: scripted.upsert.source. This will contain the painless script to be executed in each request.

Use cases

This feature will bring a lot of flexibility to the elastic connector as described in the use cases below (and also many others not described here) that otherwise couldn't be achieved:

  • Check the @timestamp before updating the document.
  • Appending items to an array in case the document exists. This is useful for when it's needed to store historical updates in the same document.
  • And many others.. like the following example:
    I want kafka connect to create a document on Elastic if the document doesn't exist (source is null) or update the document in case a specific field called doc_num has outdated value. Outdated in this example means the doc_num value on Elastic (source) is less than the doc_num value in the record (params). The connector configuration would be like this:
key.ignore: false
write.method: scripted_upsert
scripted.upsert.source: "if(ctx._source.isEmpty() || ctx._source.doc_num < params.doc_num) ctx._source = params"

Alternatives considered

Add the script field into the record using a SMT instead of the sink class:

This would avoid the changes here on the sink class, but ideally the SMT logic should be platform agnostic. Since we have all the Elastic logic here is easier and makes more sense to add it here, also delegating this responsibility to the SMT is error prone.

Use the existing UPSERT method

This works as well, but we would need add an if condition to check if the the property scripted.upsert.source is being passed and then use the script logic, but this is not as good as explicitly using SCRIPTED_UPSERT and also this avoid potentially introducing breaking changes on the existing UPSERT method.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions