Skip to content

Commit 6f122e3

Browse files
authored
Fix Consume/ProduceError to handle KafkaError constants and instances (#873)
1 parent ae08d74 commit 6f122e3

File tree

4 files changed

+173
-51
lines changed

4 files changed

+173
-51
lines changed

confluent_kafka/deserializing_consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,23 +128,23 @@ def poll(self, timeout=-1):
128128
return None
129129

130130
if msg.error() is not None:
131-
raise ConsumeError(msg.error(), message=msg)
131+
raise ConsumeError(msg.error(), kafka_message=msg)
132132

133133
ctx = SerializationContext(msg.topic(), MessageField.VALUE)
134134
value = msg.value()
135135
if self._value_deserializer is not None:
136136
try:
137137
value = self._value_deserializer(value, ctx)
138138
except Exception as se:
139-
raise ValueDeserializationError(exception=se, message=msg)
139+
raise ValueDeserializationError(exception=se, kafka_message=msg)
140140

141141
key = msg.key()
142142
ctx.field = MessageField.KEY
143143
if self._key_deserializer is not None:
144144
try:
145145
key = self._key_deserializer(key, ctx)
146146
except Exception as se:
147-
raise KeyDeserializationError(exception=se, message=msg)
147+
raise KeyDeserializationError(exception=se, kafka_message=msg)
148148

149149
msg.set_key(key)
150150
msg.set_value(value)

confluent_kafka/error.py

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,56 @@
1616
# limitations under the License.
1717
#
1818
from confluent_kafka.cimpl import KafkaException, KafkaError
19-
2019
from confluent_kafka.serialization import SerializationError
2120

2221

23-
class ConsumeError(KafkaException):
22+
class _KafkaClientError(KafkaException):
2423
"""
25-
Wraps all errors encountered during the consumption of a message.
26-
27-
Note:
28-
In the event of a serialization error the original message contents
29-
may be retrieved from the ``message`` attribute.
24+
Wraps all errors encountered by a Kafka Client
3025
3126
Args:
32-
error_code (KafkaError): Error code indicating the type of error.
27+
kafka_error (KafkaError): KafkaError instance.
3328
3429
exception(Exception, optional): The original exception
3530
36-
message (Message, optional): The Kafka Message returned from the broker.
37-
31+
kafka_message (Message, optional): The Kafka Message returned
32+
by the broker.
3833
"""
39-
def __init__(self, error_code, exception=None, message=None):
40-
if exception is not None:
41-
kafka_error = KafkaError(error_code, repr(exception))
42-
self.exception = exception
43-
else:
44-
kafka_error = KafkaError(error_code)
45-
self.exception = None
4634

47-
super(ConsumeError, self).__init__(kafka_error)
48-
self.message = message
35+
def __init__(self, kafka_error, exception=None, kafka_message=None):
36+
super(_KafkaClientError, self).__init__(kafka_error)
37+
self.exception = exception
38+
self.kafka_message = kafka_message
4939

5040
@property
5141
def code(self):
52-
return self.code()
42+
return self.args[0].code()
5343

5444
@property
5545
def name(self):
56-
return self.name()
46+
return self.args[0].name()
47+
48+
49+
class ConsumeError(_KafkaClientError):
50+
"""
51+
Wraps all errors encountered during the consumption of a message.
52+
53+
Note:
54+
In the event of a serialization error the original message
55+
contents may be retrieved from the ``kafka_message`` attribute.
56+
57+
Args:
58+
kafka_error (KafkaError): KafkaError instance.
59+
60+
exception(Exception, optional): The original exception
61+
62+
kafka_message (Message, optional): The Kafka Message
63+
returned by the broker.
64+
65+
"""
66+
67+
def __init__(self, kafka_error, exception=None, kafka_message=None):
68+
super(ConsumeError, self).__init__(kafka_error, exception, kafka_message)
5769

5870

5971
class KeyDeserializationError(ConsumeError, SerializationError):
@@ -64,12 +76,15 @@ class KeyDeserializationError(ConsumeError, SerializationError):
6476
Args:
6577
exception(Exception, optional): The original exception
6678
67-
message (Message, optional): The Kafka Message returned from the broker.
79+
kafka_message (Message, optional): The Kafka Message returned
80+
by the broker.
6881
6982
"""
70-
def __init__(self, exception=None, message=None):
83+
84+
def __init__(self, exception=None, kafka_message=None):
7185
super(KeyDeserializationError, self).__init__(
72-
KafkaError._KEY_DESERIALIZATION, exception=exception, message=message)
86+
KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)),
87+
exception=exception, kafka_message=kafka_message)
7388

7489

7590
class ValueDeserializationError(ConsumeError, SerializationError):
@@ -80,41 +95,29 @@ class ValueDeserializationError(ConsumeError, SerializationError):
8095
Args:
8196
exception(Exception, optional): The original exception
8297
83-
message (Message, optional): The Kafka Message returned from the broker.
98+
kafka_message (Message, optional): The Kafka Message returned
99+
by the broker.
84100
85101
"""
86-
def __init__(self, exception=None, message=None):
102+
103+
def __init__(self, exception=None, kafka_message=None):
87104
super(ValueDeserializationError, self).__init__(
88-
KafkaError._VALUE_DESERIALIZATION, exception=exception, message=message)
105+
KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)),
106+
exception=exception, kafka_message=kafka_message)
89107

90108

91-
class ProduceError(KafkaException):
109+
class ProduceError(_KafkaClientError):
92110
"""
93111
Wraps all errors encountered when Producing messages.
94112
95113
Args:
96-
error_code (KafkaError): Error code indicating the type of error.
114+
kafka_error (KafkaError): KafkaError instance.
97115
98116
exception(Exception, optional): The original exception.
99-
100117
"""
101-
def __init__(self, error_code, exception=None):
102-
if exception is not None:
103-
kafka_error = KafkaError(error_code, repr(exception))
104-
self.exception = exception
105-
else:
106-
kafka_error = KafkaError(error_code)
107-
self.exception = None
108118

109-
super(ProduceError, self).__init__(kafka_error)
110-
111-
@property
112-
def code(self):
113-
return self.code()
114-
115-
@property
116-
def name(self):
117-
return self.name()
119+
def __init__(self, kafka_error, exception=None):
120+
super(ProduceError, self).__init__(kafka_error, exception, None)
118121

119122

120123
class KeySerializationError(ProduceError, SerializationError):
@@ -124,9 +127,11 @@ class KeySerializationError(ProduceError, SerializationError):
124127
Args:
125128
exception (Exception): The exception that occurred during serialization.
126129
"""
130+
127131
def __init__(self, exception=None):
128132
super(KeySerializationError, self).__init__(
129-
KafkaError._KEY_SERIALIZATION, exception=exception)
133+
KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)),
134+
exception=exception)
130135

131136

132137
class ValueSerializationError(ProduceError, SerializationError):
@@ -136,6 +141,8 @@ class ValueSerializationError(ProduceError, SerializationError):
136141
Args:
137142
exception (Exception): The exception that occurred during serialization.
138143
"""
144+
139145
def __init__(self, exception=None):
140146
super(ValueSerializationError, self).__init__(
141-
KafkaError._VALUE_SERIALIZATION, exception=exception)
147+
KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)),
148+
exception=exception)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limit
17+
#
18+
19+
import pytest
20+
from confluent_kafka.cimpl import TopicPartition, OFFSET_END
21+
22+
from confluent_kafka.error import ConsumeError
23+
from confluent_kafka.serialization import StringSerializer
24+
25+
26+
def test_consume_error(kafka_cluster):
27+
"""
28+
Tests to ensure librdkafka errors are propagated as
29+
an instance of ConsumeError.
30+
"""
31+
topic = kafka_cluster.create_topic("test_commit_transaction")
32+
consumer_conf = {'enable.partition.eof': True}
33+
34+
producer = kafka_cluster.producer()
35+
producer.produce(topic=topic, value="a")
36+
producer.flush()
37+
38+
consumer = kafka_cluster.consumer(consumer_conf,
39+
value_deserializer=StringSerializer())
40+
consumer.assign([TopicPartition(topic, 0, OFFSET_END)])
41+
42+
with pytest.raises(ConsumeError, match="No more messages"):
43+
# Trigger EOF error
44+
consumer.poll()

tests/test_error.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limit
17+
#
18+
19+
from confluent_kafka.cimpl import KafkaError
20+
from confluent_kafka.error import ConsumeError, \
21+
ProduceError
22+
23+
24+
def test_new_consume_error_constant():
25+
ce = ConsumeError(KafkaError(KafkaError._PARTITION_EOF))
26+
27+
assert ce.code == KafkaError._PARTITION_EOF
28+
assert ce.name == u'_PARTITION_EOF'
29+
30+
31+
def test_new_consume_error_caused_by():
32+
ce = ConsumeError(KafkaError(KafkaError.INVALID_CONFIG),
33+
exception=ValueError())
34+
35+
assert ce.code == KafkaError.INVALID_CONFIG
36+
assert ce.name == u'INVALID_CONFIG'
37+
assert isinstance(ce.exception, ValueError)
38+
39+
40+
def test_new_consume_error_custom_message():
41+
ce = ConsumeError(KafkaError(KafkaError._KEY_SERIALIZATION,
42+
"Unable to serialize key"))
43+
44+
assert ce.code == KafkaError._KEY_SERIALIZATION
45+
assert ce.name == u'_KEY_SERIALIZATION'
46+
assert ce.args[0].str() == "Unable to serialize key"
47+
48+
49+
def test_new_produce_error_constant():
50+
pe = ProduceError(KafkaError(KafkaError._PARTITION_EOF))
51+
52+
assert pe.code == KafkaError._PARTITION_EOF
53+
assert pe.name == u'_PARTITION_EOF'
54+
55+
56+
def test_new_produce_error_caused_by():
57+
pe = ProduceError(KafkaError(KafkaError.INVALID_CONFIG),
58+
exception=ValueError())
59+
60+
assert pe.code == KafkaError.INVALID_CONFIG
61+
assert pe.name == u'INVALID_CONFIG'
62+
assert isinstance(pe.exception, ValueError)
63+
64+
65+
def test_new_produce_error_custom_message():
66+
pe = ProduceError(KafkaError(KafkaError._KEY_SERIALIZATION,
67+
"Unable to serialize key"))
68+
69+
assert pe.code == KafkaError._KEY_SERIALIZATION
70+
assert pe.name == u'_KEY_SERIALIZATION'
71+
assert pe.args[0].str() == "Unable to serialize key"

0 commit comments

Comments
 (0)