-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15027 - adjust AvroWriter handling of invalid payloads; ConsumeKafka impact #10366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| try { | ||
| dataFileWriter.append(rec); | ||
| } catch (final DataFileWriter.AppendWriteException e) { | ||
| throw new IOException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this ticket.
This changed line breaks other writeRecord() callers who explicitly catch DataFileWriter.AppendWriteException like this example
https://github.com/jrsteinebrey/nifi/blob/b0f29ef94e95be8160ec2cd5fbdfbef373451f90/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java#L466
They would need to be changed to catch IOException instead of AppendWriteException.
Instead of this change here in WriteAvroResultWithSchema.java,
I suggest that you consider changing the Kafka code here
https://github.com/apache/nifi/blob/1457950040d0fe86ade53770def6c5a95b6f0252/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java#L112-L120\
to catch (Exception) instead of specific exception classes. Then the ticket is resolved and any future created exception classes also route to failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's reasonable; thanks.
I'm not familiar with the reason for the "catch all" in AbstractRecordStreamKafkaMessageConverter.
To me, the problem seems to be that the Avro writer implementation throws a particular exception (class) that is not visible in the classpath of the Kafka implementation. So we can't act based on that particular exception.
Another variation would be for AvroWriter to throw MalformedRecordException instead of IOException, as that better conveys the particular problem (bad data).
There are potential side effects to either of these potential paths forward; hopefully others in the community will chime in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd also go with the change only in the Kafka class where we would catch all exceptions to route to parse failure. Thoughts @exceptionfactory @markap14 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing the call structure, I favor the proposed approach that catches the AppendWriteException and throws something more specific. Wrapping it and throwing an IOException seems appropriate based on the description of AppendWriteException, although I would add a message to the IOException.
For broader context, the JdbcCommon handling of dataWriter.append() is not directly related, and in that case, catching AppendWriteException only serves to allow for more specific exception messaging.
The contract of RecordReaderFactory.createRecordReader() defines the three checked exceptions, which the KafkaMessageConverter handles as parse failures. Any other exceptions propagate to ConsumeKafka.onTrigger(), where the transaction is rolled back. For this reason, catching a general Exception as a parse failure could mask other issues that indicate a programming bug, versus a problem with the record or schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jrsteinebrey @pvillard31 @exceptionfactory for your input!
I propose making this update to the changeset:
Another variation would be for AvroWriter to throw MalformedRecordException instead of IOException, as that better conveys the particular problem (bad data).
Does that work for everyone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although MalformedRecordException is the most precise, it does not align with the writeRecord method signature, since MalformedRecordException extends the base Exception class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that makes sense. I will leave things as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, with that determined, the only other change I recommend is including a message for the IOException, such as Failed to write Avro Record.
| try { | ||
| dataFileWriter.append(rec); | ||
| } catch (final DataFileWriter.AppendWriteException e) { | ||
| throw new IOException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-binding: I am good with IOException being thrown here like @exceptionfactory recommended.
| private static final String RESOURCE_AVRO_SCHEMA_NULLABLE = "src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json"; | ||
| private static final String RESOURCE_AVRO_SCHEMA_REQUIRED = "src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful to make these JSON files multiline strings within the test class, but I will defer to the current implementation if you prefer to leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; my opinion is that embedded multiline resources (especially JSON) can be harder to read when the needed escapes are present. So, I'd like to retain the current implementation of those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that escaped JSON is harder to read. Multiline strings do not need escaping, which is the reason for the suggestion, but I'm fine with leaving the current approach for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I misunderstood? Are you saying that lines 64, 65 would be better as four lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that the JSON could be defined as follows with a multiline string:
private static final String SCHEMA_JSON = """
{
"name": "test",
"type": "record",
"fields": [
{
"name": "text",
"type": "string"
},
{
"name": "ordinal",
"type": "long"
}
]
}
""";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL! I'll make that change.
|
Thanks for the updates to the tests @greyp9, if you can just add a message to the |
Summary
NIFI-15027
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation