-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Discussed in #2668
Originally posted by linus-learns April 27, 2023
Hi,
I'm running a setup with @KafkaListener annotated consumers using a ConcurrentKafkaListenerContainerFactory setup with asyncAcks and AckMode.MANUAL. I have tried to implement a CommonErrorHandler for my setup with isAckAfterHandle set to true and seeksAfterHandling set to false, but the bookkeeping of the deferred acks does not work, as the the ack after the error handler has been invoked and the broken record handled is not done through the ackInOrder method. This causes the consumer to hang if a poison pill appears in the middle of the batch and higher offset commits are waiting in the ListenerConsumer.
It appears that the ack for the poison pill is never commited until the consumer get's the shutdown signal, as a rebalance causes the broken record to be fetched by the new partition owner but a restart resolves the issue.
Is this a bug, or should I use a different way of handling my errors given my setup?