Skip to content

Conversation

@sauravkumarrr
Copy link
Contributor

Problem

When Redis keys have very low TTL values, they may expire after keyspace notifications are sent but before the source connector can process them. In such cases, the connector receives an event indicating that something happened with the key, but when it attempts to retrieve the key data, it returns null since the key has already expired. This caused the processing flow to break because the type field was null, leading to NullPointerException or unexpected behavior while schema conversion since its of required string type field.

Solution

To handle expired keys gracefully, I've made the a schema of type field OPTIONAL_STRING_SCHEMA. When the type is null (indicating an expired key), the code now not throw any error and will set the type as null. This allows the connector to continue processing these records as expired/non-existent keys rather than failing.

Examples

Normal ingested record (key exists and has data):

{
  "key": "redis_keys:User_2",
  "ttl": {
    "long": 1756401363994
  },
  "type": "ReJSON-RL",
  "hash": null,
  "string": null,
  "json": {
    "string": "{\"registertime\":1500765952457,\"userid\":\"User_2\",\"regionid\":\"Region_8\",\"gender\":\"FEMALE\"}"
  },
  "list": null,
  "set": null,
  "zset": null
}

Expired key ingested record (key expired, type set to null):

{
  "key": "redis_keys:User_5",
  "ttl": {
    "long": -2
  },
  "type": null,
  "hash": null,
  "string": null,
  "json": null,
  "list": null,
  "set": null,
  "zset": null
}

@glbrtrnh
Copy link

hello there! is there a chance this fix could be released? I face similar issue regarding keys which are either expired or evicted. I believe this optional setting on string, especially on redis type would prevent connector from such error :

org.apache.kafka.connect.errors.ConnectException: Could not read from Redis
at com.redis.kafka.connect.source.RedisKeysSourceTask.poll(RedisKeysSourceTask.java:129)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:466)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "type", schema type: STRING
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:217)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at com.redis.kafka.connect.source.ToStructFunction.apply(ToStructFunction.java:66)
at com.redis.kafka.connect.source.RedisKeysSourceTask.convert(RedisKeysSourceTask.java:117)
at com.redis.kafka.connect.source.RedisKeysSourceTask.poll(RedisKeysSourceTask.java:126)
... 11 more

thanks a lot!

@ErnandesFranco
Copy link

Hello, do we know when this will be fixed I am also having problems with this issue, and my connector always goes offline, and I need to restart the whole kafka service to fix it, because only restarting the task of the connector does not solve the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants