【问题标题】:signal handling pika / python信号处理 pika / python
【发布时间】:2014-06-16 18:10:03
【问题描述】:

我在为每条消息执行一些任务的消费者中使用pika.BlockingConnection。我还添加了信号处理,以便消费者在完成所有任务后正常死亡。

在处理消息并收到信号时,我只是从函数中得到"signal received",但代码没有退出。所以,我决定也检查回调函数结束时收到的信号。问题是,我要检查多少次信号,因为这段代码中会有更多的功能。有没有更好的方法来处理信号而不做过多的事情?

import signal
import sys
import pika
from time import sleep

received_signal = False
all_over = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test'))
mq_channel = mq_connection.channel()

def callback(ch, method, properties, body):
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
    print body
    sleep(50)
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)
    print "Message consumption complete"

    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)

try:
    print ' [*] Waiting for messages. To exit press CTRL+C'
    mq_channel.basic_consume(callback, queue='test')
    mq_channel.start_consuming()
except Exception:
    mq_channel.close()
    exit()

这是我在这里的第一个问题,如果需要更多详细信息,请告诉我。

【问题讨论】:

  • 您当前的代码将吞下 SIGTERM 或 SIGINT 直到通过队列接收到下一条消息,此时它应该退出。这真的是你想要的吗?为什么不让signal_handler 方法直接调用sys.exit(0)
  • 我希望信号处理以两种方式完成:1)在等待消息时,它应该只是死掉 2)在消费消息时,它应该完成当前工作,然后死掉。我当前的代码包含第二个条件,但不是第一个条件。这就是问题。这可能吗?
  • 是的,这是可能的。我会添加一个答案。

标签: python signals signal-handling pika


【解决方案1】:

我认为这可以满足您的需求:

#!/usr/bin/python

import signal
import sys 
import pika
from contextlib import contextmanager

received_signal = False
processing_callback = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True
    if not processing_callback:
         sys.exit()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

@contextmanager
def block_signals():
    global processing_callback
    processing_callback = True
    try:
        yield
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()

def callback(ch, method, properties, body):
    with block_signals:
        print body
        sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't.
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"

if __name__ == "__main__":    
    try:
        mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        mq_channel = mq_connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        mq_channel.basic_consume(callback, queue='test')
        mq_channel.start_consuming()
    except Exception as e:
        mq_channel.close()
        sys.exit()

我使用上下文管理器来处理阻塞信号,以便所有逻辑都隐藏在回调本身之外。这也应该更容易重用代码。只是为了澄清它是如何工作的,它相当于:

def callback(ch, method, properties, body):
    global processing_callback
    processing_callback = True
    try:
        print body
        sum(xrange(0, 200050000))
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()

【讨论】:

  • 使用 xrange 而不是 range,否则您的内存会爆炸,并且在分页到磁盘时会开始抖动。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-06-07
  • 2013-04-03
相关资源
最近更新 更多