Support for long running message consumer #18456
Replies: 12 comments
-
Please take a look at this document which may help you. http://pulsar.apache.org/docs/en/concepts-messaging/#negative-acknowledgement |
Beta Was this translation helpful? Give feedback.
-
@codelipenghui I had a look at the negative-acknowledgement. This will still not work if my ackTimeout is set to 10 mins and the message I am consuming is taking 30 mins (for e.g.). The broker will resurface the message after 10 mins, even though one of the consumer is still working on it. |
Beta Was this translation helpful? Give feedback.
-
@shubham-Shole4ever |
Beta Was this translation helpful? Give feedback.
-
@codelipenghui |
Beta Was this translation helpful? Give feedback.
-
@shubham-Shole4ever When a consumer crashes, or the TCP connection is broken, the messages that were delivered to this consumer and not acked, will be replayed to another available consumer (in case of shared subscriptions) or next time the consumer reconnects. You don't need ack timeout for that. |
Beta Was this translation helpful? Give feedback.
-
@shubham-Shole4ever does Matteo's comment make sense to you? |
Beta Was this translation helpful? Give feedback.
-
@sijie I can do with the workaround suggested by @codelipenghui and @merlimat for the time being. However, as mentioned, the solution will not work in case I also have a need of ackTimeout. Thanks @codelipenghui and @merlimat for all the help. :) |
Beta Was this translation helpful? Give feedback.
-
Stumbled upon this looking for another answer. However in case it helps anyone I'll leave a comment. For such cases I guess it's possible to combine DLQ with ackTimeout. Default value I use is 3. Although I don't use auto-ack I guess it will still work the same way. If message times-out 3 times (in this case) it will automatically go to Dead Letter Queue. This will prevent it to loop endlessly between services. My example is that I'm building up a module for a framework. Now in such case I don't do ackTimeout but let users set it if they want to. However, I do by default set 3 retries before DLQ. Reason was personal experience where it endlessly looped my test message to the shared consumers and I got error logs all the time and couldn't figure it out. Then I realised well message is simply getting negativeAck from each consumer and then redelivered all the time but funny thing is it was malformed JSON message so consumers were doomed to crash (validations falied for previously nullable thing in kotlin that I moved to non-null). When I set up DLQ to 3 I had some messages fail and then get re-read due to timeout for ack. But combining DLQ, ackTimeout, and shared consumers I think you can set timeout pretty low if processing data takes less time and you do manual ACK as soon as it's done. |
Beta Was this translation helpful? Give feedback.
This comment was marked as off-topic.
This comment was marked as off-topic.
-
There is still no good solution for retrying long running jobs. |
Beta Was this translation helpful? Give feedback.
-
I think there's plenty of good enough workarounds but I agree there should be one optimal for long running consumers out-of-the-box. Just to list a few:
I assume some kind of 3 would be good to have out-of-the-box. Best of course would be to have something like LRQ (long running queue for the lack of creativity from my side) where upon retry of ackTimeout consumer has the option to send back the message to broker like 'still processing' and it moves message to this queue and have Pulsar track if TCP dies, push them back to normal queue and retry, if TCP is alive let consumer tell when this message should be removed. Using DLQ for this is also possible but confuses messages that where retired too much and the ones that consumer is aware take too long. |
Beta Was this translation helpful? Give feedback.
-
I'm moving this discussion to the Discussions forum since it's an open-ended discussion instead of an actionable task :) |
Beta Was this translation helpful? Give feedback.
-
Is your feature request related to a problem? Please describe.
The ackTimeout is set at the consumer level and is valid for all the messages that consumer handles. We have a case where the consumption of a message takes an unpredictable amount of time, ranging from 10 mins to couple hours. We also don't want to set the ackTimeout for the messages to be max possible (which could be half a day or more).
Can we have a feature where the consumer can send back a signal to the broker, acknowledging that its not failed but currently working on the received message, and the broker extends the ackTimeout for that message.
Describe the solution you'd like
A functionality which allows the consumer to notify the broker that it is working on the received message. The broker, on receiving this signal can extend the ackTimeout for that particular message (probably refreshing the ackTimeout)
Describe alternatives you've considered
Currently, there is no way to modify the ackTimeout for a particular message. The ackTimeout is set at the consumer level and cannot be modified for any message.
Beta Was this translation helpful? Give feedback.
All reactions