Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro de-serialization failure - TopicNameStrategy #30

Open
bluedog13 opened this issue Apr 3, 2022 · 2 comments
Open

Avro de-serialization failure - TopicNameStrategy #30

bluedog13 opened this issue Apr 3, 2022 · 2 comments

Comments

@bluedog13
Copy link

The record (value) in the Kafka topic is Avro serialized. I am using the below redis sink connector settings to copy the data from Kafka topic to Redis

{
  "name": "redis-sink-connector",
  "config": {
    "connector.class"                                     : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "redis.hosts"                                         : "redis:6379",
    "tasks.max"                                           : 1,
    "topics"                                              : "[redacted]",
    "key.converter.schemas.enable"                        : "false",
    "key.converter"                                       : "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url"                   : "https://[redacted]azure.confluent.cloud",
    "key.converter.schema.registry.basic.auth.user.info"  : "[redacted]:[redacted]",
    "key.converter.basic.auth.credentials.source"         : "USER_INFO",
    "value.converter.schemas.enable"                      : "true",
    "value.converter"                                     : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url"                 : "https://[redacted].azure.confluent.cloud",
    "value.converter.schema.registry.basic.auth.user.info": "[redacted]:[redacted]",
    "value.converter.basic.auth.credentials.source"       : "USER_INFO",
    "value.converter.value.subject.name.strategy"         : "io.confluent.kafka.serializers.subject.TopicNameStrategy"
  }
}

I see the below error

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: The value for the record must be String or Bytes. Consider using the ByteArrayConverter or StringConverter if the data is stored in Kafka in the format needed in Redis. Another option is to use a single message transformation to transform the data before it is written to Redis. (org.apache.kafka.connect.runtime.WorkerSinkTask)

In the samples for this repo, I see "TopicRecordNameStrategy" is used for Avro de-serialization, so I wanted to know if it only work for "TopicRecordNameStrategy" strategy or any other strategy - as in my case (TopicNameStrategy)?

Thanks.

@bluedog13 bluedog13 reopened this Apr 3, 2022
@jaredpetersen
Copy link
Owner

jaredpetersen commented Apr 11, 2022

I haven't played with any other naming strategies so at this time it's not supported. I'm not sure what the problem is from looking at the error message so I'd have to play with it to figure out what's going on.

@bluedog13
Copy link
Author

bluedog13 commented Apr 11, 2022

Thank you for the reply. I was able to find an an alternative solution that works for TopicNameStrategy.

I used the "ByteArrayConverter" to move the data into Redis in "As-is" format from Kafka. Then I used the raw bytes from Redis on the reader side to connect to the Schema Registry and de-serialize the message.

On the connector side

"value.converter" : "org.apache.kafka.connect.converters.ByteArrayConverter",

On the Redis C# consumer side

AvroDeserializer<T> deserializer = new AvroDeserializer<T>(schemaRegistryClient);
T result = await deserializer.DeserializeAsync(obj, false, SerializationContext.Empty);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants