Skip to content

Commit c7a6cf5

Browse files
committed
integration_test: performance: retry produce on backpressure
Otherwise consumer_performance won't have enough messages to consume
1 parent 135f17b commit c7a6cf5

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

examples/integration_test.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,19 @@ def verify_producer_performance(with_dr_cb=True):
239239
bar = None
240240

241241
for i in range(0, msgcnt):
242-
try:
243-
if with_dr_cb:
244-
p.produce(topic, value=msg_payload, callback=dr.delivery)
245-
else:
246-
p.produce(topic, value=msg_payload)
247-
except BufferError as e:
248-
# Local queue is full (slow broker connection?)
249-
msgs_backpressure += 1
250-
if bar is not None and (msgs_backpressure % 1000) == 0:
251-
bar.next(n=0)
252-
p.poll(0)
242+
while True:
243+
try:
244+
if with_dr_cb:
245+
p.produce(topic, value=msg_payload, callback=dr.delivery)
246+
else:
247+
p.produce(topic, value=msg_payload)
248+
break
249+
except BufferError as e:
250+
# Local queue is full (slow broker connection?)
251+
msgs_backpressure += 1
252+
if bar is not None and (msgs_backpressure % 1000) == 0:
253+
bar.next(n=0)
254+
p.poll(100)
253255
continue
254256

255257
if bar is not None and (msgs_produced % 5000) == 0:
@@ -268,7 +270,7 @@ def verify_producer_performance(with_dr_cb=True):
268270
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
269271
msgs_produced / t_produce_spent,
270272
(bytecnt/t_produce_spent) / (1024*1024)))
271-
print('# %d messages not produce()d due to backpressure (local queue full)' % msgs_backpressure)
273+
print('# %d temporary produce() failures due to backpressure (local queue full)' % msgs_backpressure)
272274

273275
print('waiting for %d/%d deliveries' % (len(p), msgs_produced))
274276
# Wait for deliveries

0 commit comments

Comments
 (0)