【问题标题】:Kombu, RabbitMQ: Ack message more than once in a consumer mixinKombu,RabbitMQ:在消费者混合中多次确认消息
【发布时间】:2016-08-21 20:56:53
【问题描述】:

我在为新的 SO 文档项目记录 Kombu 时偶然发现了这个问题。

考虑以下 Consumer Mixin 的 Kombu 代码:

from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from kombu.exceptions import MessageStateError
import datetime

# Send a message to the 'test_queue' queue
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    with conn.SimpleQueue(name='test_queue') as queue:
        queue.put('String message sent to the queue')


# Callback functions
def print_upper(body, message):
    print body.upper()
    message.ack()    

def print_lower(body, message):
    print body.lower()
    message.ack()


# Attach the callback function to a queue consumer 
class Worker(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues=Queue('test_queue'), callbacks=[print_even_characters, print_odd_characters]),
        ]

# Start the worker
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    worker = Worker(conn)
    worker.run()

代码失败:

kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

因为消息在print_even_characters()print_odd_characters() 上被确认了两次。

一个可行的简单解决方案是只确认最后一个回调函数,但如果我想在其他队列或连接上使用相同的函数,它会破坏模块化。

如何确认发送到多个回调函数的排队 Kombu 消息?

【问题讨论】:

    标签: python rabbitmq kombu message-ack


    【解决方案1】:

    解决方案

    1 - 检查message.acknowledged

    message.acknowledged 标志检查消息是否已经被确认:

    def print_upper(body, message):
        print body.upper()
        if not message.acknowledged: 
            message.ack()
    
    
    def print_lower(body, message):
        print body.lower()
        if not message.acknowledged: 
            message.ack()
    

    优点:易读、简短。

    缺点:中断Python EAFP idiom

    2 - 捕获异常

    def print_upper(body, message):
        print body.upper()
        try:
            message.ack()
        except MessageStateError:
            pass
    
    
    def print_lower(body, message):
        print body.lower()
        try:
            message.ack()
        except MessageStateError:
            pass
    

    优点:可读,Pythonic。

    缺点:有点长 - 每个回调有 4 行样板代码。

    3 - 确认最后一个回调

    文档保证callbacks are called in order.因此,我们可以简单地.ack() 只使用最后一个回调:

    def print_upper(body, message):
        print body.upper()
    
    
    def print_lower(body, message):
        print body.lower()
        message.ack()
    

    优点:简短、易读、没有样板代码。

    缺点:不是模块化的:回调不能被另一个队列使用,除非最后一个回调总是最后一个。这种隐含的假设可能会破坏调用者代码。

    这可以通过将回调函数移动到Worker 类中来解决。我们放弃了一些模块化——这些函数不会从外部调用——但获得了安全性和可读性。

    总结

    1 和 2 之间的区别只是风格问题。

    如果执行顺序很重要,以及消息是否在成功通过所有回调之前不应被 ACK-ed,则应选择解决方案 3。

    如果应该始终确认消息,则应该选择 1 或 2,即使一个或多个回调失败。

    请注意,还有其他可能的设计;这个答案是指驻留在工作人员之外的回调函数。

    【讨论】: