【问题标题】:A good heartbeat interval for pika-rabbitmq in Amazon ec2Amazon ec2 中 pika-rabbitmq 的良好心跳间隔
【发布时间】:2013-02-22 02:10:39
【问题描述】:

我正在为 rabbitmq 使用最新的 pika 库(0.9.9+)。我对 rabbitmq 和 pika 的使用如下:

  1. 作为工人,我有长时间运行的任务(大约 5 分钟)。这些任务从 rabbitmq 获取请求。请求很少出现,即请求之间有很长的空闲时间。
  2. 我之前遇到的问题与空闲连接(由于空闲连接导致的连接关闭)有关。所以,我在 pika 中启用了心跳。
  3. 现在心跳的选择是个问题。 Pika 似乎是一个单线程库,其中心跳接收和确认恰好在请求时间范围之间完成。
  4. 因此,如果设置的心跳间隔小于回调函数用于执行其长时间运行计算的时间,则服务器不会收到任何心跳确认并关闭连接。
  5. 因此,我假设最小心跳间隔应该是阻塞连接中回调函数的最大计算时间。

对于亚马逊 ec2 来说,什么是好的心跳值可以防止它关闭空闲连接?

另外,有些人建议使用 rabbitmq keepalive(或 libkeepalive)来维护 tcp 连接。我认为在 tcp 层管理心跳要好得多,因为应用程序不需要管理它们。这是真的吗?与 RMQ 心跳相比,keepalive 是一种好方法吗?

我看到有些人建议使用多线程和队列来处理长时间运行的任务。但这是长时间运行任务的唯一选择吗?在这种情况下必须使用另一个队列,这非常令人失望。

提前谢谢你。我想我已经详细说明了这个问题。如果我可以提供更多详细信息,请告诉我。

【问题讨论】:

    标签: python rabbitmq heartbeat pika


    【解决方案1】:

    如果你不喜欢使用 pika,这个 thread 帮助我实现了你想要使用 kombu 做的事情:

    #!/usr/bin/env python
    import time, logging, weakref, eventlet
    from kombu import Connection, Exchange, Queue
    from kombu.utils.debug import setup_logging
    from kombu.common import eventloop
    from eventlet import spawn_after
    
    eventlet.monkey_patch()
    
    log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
                  '-35s %(lineno) -5d: %(message)s')
    logging.basicConfig(level=logging.INFO, format=log_format)
    logger = logging.getLogger('job_worker')
    logger.setLevel(logging.INFO)
    
    
    def long_running_function(body):
        time.sleep(300)
    
    def job_worker(body, message):
        long_running_function(body)
        message.ack()
    
    def monitor_heartbeats(connection, rate=2):
        """Function to send heartbeat checks to RabbitMQ. This keeps the
           connection alive over long-running processes."""
        if not connection.heartbeat:
            logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
            return
        interval = connection.heartbeat
        cref = weakref.ref(connection)
        logger.info("Starting heartbeat monitor.")
    
        def heartbeat_check():
            conn = cref()
            if conn is not None and conn.connected:
                conn.heartbeat_check(rate=rate)
                logger.info("Ran heartbeat check.")
                spawn_after(interval, heartbeat_check)
        return spawn_after(interval, heartbeat_check)
    
    def main():
        setup_logging(loglevel='INFO')
    
        # process for heartbeat monitor
        p = None
    
        try:
            with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
                conn.ensure_connection()
                monitor_heartbeats(conn)
                queue = Queue('job_queue',
                              Exchange('job_queue', type='direct'),
                              routing_key='job_queue')
                logger.info("Starting worker.")
                with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
                    consumer.qos(prefetch_count=1)
                    for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
                        pass
        except KeyboardInterrupt:
            logger.info("Worker was shut down.")
    
    if __name__ == "__main__":
        main()
    

    我删除了我的领域特定代码,但本质上这是我使用的框架。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-05-21
      • 2014-04-10
      • 1970-01-01
      • 2023-03-29
      • 2013-10-12
      • 1970-01-01
      相关资源
      最近更新 更多