Skip to content

Fix error propagation rule for Python's C API #2019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

v2.11.0 is a feature release with the following enhancements:

### Fixes
- Fix error propagation rule for Python's C API to prevent SystemError when callbacks raise exceptions (#865)

confluent-kafka-python v2.11.0 is based on librdkafka v2.11.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -5082,7 +5082,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
PyObject *trace = NULL;

/* Fetch (and clear) currently raised exception */
PyErr_Fetch(&exctype, &error, &trace);
cfl_exception_fetch(&exctype, &error, &trace);
Py_XDECREF(trace);
}

Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (result)
Py_DECREF(result);
else {

CallState_fetch_exception(cs);
CallState_crash(cs);
rd_kafka_yield(rk);
}
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (result)
Py_DECREF(result);
else {

CallState_fetch_exception(cs);
CallState_crash(cs);
rd_kafka_yield(rk);
}
Expand Down
19 changes: 18 additions & 1 deletion src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,8 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
if (result)
Py_DECREF(result);
else {

CallState_fetch_exception(cs);
crash:
CallState_crash(cs);
rd_kafka_yield(h->rk);
Expand Down Expand Up @@ -1808,6 +1810,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker
/* throttle_cb executed successfully */
Py_DECREF(result);
goto done;
} else {
CallState_fetch_exception(cs);
}

/**
Expand Down Expand Up @@ -1839,6 +1843,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
if (result)
Py_DECREF(result);
else {
CallState_fetch_exception(cs);
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand Down Expand Up @@ -1874,6 +1879,7 @@ static void log_cb (const rd_kafka_t *rk, int level,
if (result)
Py_DECREF(result);
else {
CallState_fetch_exception(cs);
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand Down Expand Up @@ -2572,6 +2578,9 @@ void CallState_begin (Handle *h, CallState *cs) {
cs->thread_state = PyEval_SaveThread();
assert(cs->thread_state != NULL);
cs->crashed = 0;
cs->exception_type = NULL;
cs->exception_value = NULL;
cs->exception_traceback = NULL;
#ifdef WITH_PY_TSS
PyThread_tss_set(&h->tlskey, cs);
#else
Expand All @@ -2592,8 +2601,16 @@ int CallState_end (Handle *h, CallState *cs) {

PyEval_RestoreThread(cs->thread_state);

if (PyErr_CheckSignals() == -1 || cs->crashed)
if (PyErr_CheckSignals() == -1)
return 0;

if (cs->crashed) {
/* Restore the saved exception if we have one */
if (cs->exception_type) {
CallState_restore_exception(cs);
}
return 0;
}

return 1;
}
Expand Down
58 changes: 58 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,66 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg);
typedef struct {
PyThreadState *thread_state;
int crashed; /* Callback crashed */
PyObject *exception_type; /* Stored exception type */
PyObject *exception_value; /* Stored exception value */
PyObject *exception_traceback; /* Stored exception traceback */
} CallState;

/**
* @brief Compatibility layer for Python exception handling API changes.
* PyErr_Fetch/PyErr_Restore were deprecated in Python 3.12 in favor of
* PyErr_GetRaisedException/PyErr_SetRaisedException.
*/

static inline void
cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) {
#if PY_VERSION_HEX >= 0x030c0000
/* Python 3.12+ - use new API */
PyObject *exc = PyErr_GetRaisedException();
if (exc) {
*exc_type = (PyObject *)Py_TYPE(exc);
Py_INCREF(*exc_type);
*exc_value = exc;
*exc_traceback = PyException_GetTraceback(exc);
} else {
*exc_type = *exc_value = *exc_traceback = NULL;
}
#else
/* Python < 3.12 - use legacy API */
PyErr_Fetch(exc_type, exc_value, exc_traceback);
#endif
}

static inline void
cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_traceback) {
#if PY_VERSION_HEX >= 0x030c0000
/* Python 3.12+ - use new API */
if (exc_value) {
PyErr_SetRaisedException(exc_value);
Py_XDECREF(exc_type);
Py_XDECREF(exc_traceback);
}
#else
/* Python < 3.12 - use legacy API */
PyErr_Restore(exc_type, exc_value, exc_traceback);
#endif
}

static inline void
CallState_fetch_exception(CallState *cs) {
cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
}

static inline void
CallState_restore_exception(CallState *cs) {
if (cs->exception_type) {
cfl_exception_restore(cs->exception_type, cs->exception_value, cs->exception_traceback);
cs->exception_type = NULL;
cs->exception_value = NULL;
cs->exception_traceback = NULL;
}
}

/**
* @brief Initialiase a CallState and unlock the GIL prior to a
* possibly blocking external call.
Expand Down
35 changes: 35 additions & 0 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
def dummy_commit_cb(err, partitions):
pass

kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100',

Check failure on line 23 in tests/test_Consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Consumer.py#L23

Define a constant instead of duplicating this literal 'socket.timeout.ms' 8 times.

Check failure on line 23 in tests/test_Consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Consumer.py#L23

Define a constant instead of duplicating this literal 'group.id' 8 times.
'session.timeout.ms': 1000, # Avoid close() blocking too long

Check failure on line 24 in tests/test_Consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Consumer.py#L24

Define a constant instead of duplicating this literal 'session.timeout.ms' 8 times.
'on_commit': dummy_commit_cb})

kc.subscribe(["test"])
Expand Down Expand Up @@ -324,3 +324,38 @@
with pytest.raises(ValueError) as ex:
TestConsumer({'bootstrap.servers': "mybroker:9092"})
assert ex.match('group.id must be set')


def test_callback_exception_no_system_error():

exception_raised = []

def error_cb_that_raises(error):
"""Error callback that raises an exception"""
exception_raised.append(error)
raise RuntimeError("Test exception from error_cb")

# Create consumer with error callback that raises exception
consumer = TestConsumer({
'group.id': 'test-callback-systemerror-fix',
'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error
'socket.timeout.ms': 100,
'session.timeout.ms': 1000,
'error_cb': error_cb_that_raises
})

consumer.subscribe(['test-topic'])

# This should trigger the error callback due to connection failure
# Before fix: Would get RuntimeError + SystemError (Issue #865)
# After fix: Should only get RuntimeError (no SystemError)
with pytest.raises(RuntimeError) as exc_info:
consumer.consume(timeout=0.1)

# Verify we got the expected exception message
assert "Test exception from error_cb" in str(exc_info.value)

# Verify the error callback was actually called
assert len(exception_raised) > 0

consumer.close()
31 changes: 31 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
p = Producer()
assert ex.match('expected configuration dict')

p = Producer({'socket.timeout.ms': 10,

Check failure on line 24 in tests/test_Producer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Producer.py#L24

Define a constant instead of duplicating this literal 'socket.timeout.ms' 5 times.
'error_cb': error_cb,
'message.timeout.ms': 10})

Check failure on line 26 in tests/test_Producer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Producer.py#L26

Define a constant instead of duplicating this literal 'message.timeout.ms' 5 times.

p.produce('mytopic')
p.produce('mytopic', value='somedata', key='a key')
Expand Down Expand Up @@ -283,3 +283,34 @@

p = Producer({})
assert bool(p)


def test_callback_exception_no_system_error():
delivery_reports = []

def delivery_cb_that_raises(err, msg):
"""Delivery report callback that raises an exception"""
delivery_reports.append((err, msg))
raise RuntimeError("Test exception from delivery_cb")

producer = Producer({
'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures
'socket.timeout.ms': 100,
'message.timeout.ms': 10, # Very short timeout to trigger delivery failure quickly
'on_delivery': delivery_cb_that_raises
})

# Produce a message - this will trigger delivery report callback when it fails
producer.produce('test-topic', value='test-message')

# Flush to ensure delivery reports are processed
# Before fix: Would get RuntimeError + SystemError (Issue #865)
# After fix: Should only get RuntimeError (no SystemError)
with pytest.raises(RuntimeError) as exc_info:
producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires

# Verify we got an exception from our callback
assert "Test exception from delivery_cb" in str(exc_info.value)

# Verify the delivery callback was actually called
assert len(delivery_reports) > 0