【问题标题】:Python asyncio: unreferenced tasks are destroyed by garbage collector?Python asyncio:未引用的任务被垃圾收集器销毁?
【发布时间】:2017-11-02 10:18:03
【问题描述】:

我正在编写一个程序,它通过 AMQP 接受 RPC 请求以执行网络请求 (CoAP)。在处理 RPC 请求时,aioamqp 回调会生成负责网络 IO 的任务。这些任务可以被认为是后台任务,它们将无限期地运行以通过 AMQP 流式传输网络响应(在这种情况下,一个 RPC 请求会触发一个 RPC 响应和数据流式传输)。

我注意到在我的原始代码中,网络任务会在看似随机的时间间隔后(在完成之前)被破坏,然后 asyncio 会打印以下警告“任务已被破坏,但它正在等待处理”。此问题类似于此处描述的问题:https://bugs.python.org/issue21163

现在我已经通过在模块级列表中存储硬引用来规避这个问题,这可以防止 GC 破坏任务对象。但是,我想知道是否有更好的解决方法?理想情况下,我想在 RPC 回调中调用 await 任务,但我注意到这会阻止任何进一步的 AMQP 操作完成 - >例如创建新的 amqp 通道会停止,通过 amqp 接收 rpc 请求也会停止。然而,我不确定是什么导致了这种停滞(因为回调本身就是一个协程,我希望等待不会让整个 aioamqp 库停滞)。

我在下面发布了 RPC 客户端和服务器的源代码,它们都基于 aioamqp/aiocoap 示例。在服务器中,on_rpc_request 是 amqp rpc 回调,send_coap_obs_request 是网络协程,当 'obs_tasks.append(task)' 被销毁时语句被删除。

client.py:

"""
    CoAP RPC client, based on aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import base64
import json
import uuid

import asyncio
import aioamqp


class CoAPRpcClient(object):
    def __init__(self):
        self.transport = None
        self.protocol = None
        self.channel = None
        self.callback_queue = None
        self.waiter = asyncio.Event()

    async def connect(self):
        """ an `__init__` method can't be a coroutine"""
        self.transport, self.protocol = await aioamqp.connect()
        self.channel = await self.protocol.channel()

        result = await self.channel.queue_declare(queue_name='', exclusive=True)
        self.callback_queue = result['queue']

        await self.channel.basic_consume(
            self.on_response,
            no_ack=True,
            queue_name=self.callback_queue,
        )

    async def on_response(self, channel, body, envelope, properties):
        if self.corr_id == properties.correlation_id:
            self.response = body

        self.waiter.set()

    async def call(self, n):
        if not self.protocol:
            await self.connect()
        self.response = None
        self.corr_id = str(uuid.uuid4())
        await self.channel.basic_publish(
            payload=str(n),
            exchange_name='',
            routing_key='coap_request_rpc_queue',
            properties={
                'reply_to': self.callback_queue,
                'correlation_id': self.corr_id,
            },
        )
        await self.waiter.wait()

        await self.protocol.close()
        return json.loads(self.response)


async def rpc_client():
    coap_rpc = CoAPRpcClient()

    request_dict = {}
    request_dict_json = json.dumps(request_dict)

    print(" [x] Send RPC coap_request({})".format(request_dict_json))
    response_dict = await coap_rpc.call(request_dict_json)
    print(" [.] Got {}".format(response_dict))


asyncio.get_event_loop().run_until_complete(rpc_client())

server.py:

"""
CoAP RPC server, based on aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import base64
import json
import sys

import logging
import warnings

import asyncio
import aioamqp
import aiocoap

amqp_protocol = None
coap_client_context = None
obs_tasks = []

AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME = 'topic_coap'
AMQP_COAP_NOTIFICATIONS_TOPIC_NAME = 'topic'
AMQP_COAP_NOTIFICATIONS_ROUTING_KEY = 'coap.response'

def create_response_dict(coap_request, coap_response):
    response_dict = {'request_uri': "", 'code': 0}
    response_dict['request_uri'] = coap_request.get_request_uri()
    response_dict['code'] = coap_response.code

    if len(coap_response.payload) > 0:
        response_dict['payload'] = base64.b64encode(coap_response.payload).decode('utf-8')

    return response_dict


async def handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response):
    # create response dict:
    response_dict = create_response_dict(coap_request, coap_response)
    message = json.dumps(response_dict)

    # create new channel:
    global amqp_protocol
    amqp_channel = await amqp_protocol.channel()

    await amqp_channel.basic_publish(
        payload=message,
        exchange_name='',
        routing_key=amqp_properties.reply_to,
        properties={
            'correlation_id': amqp_properties.correlation_id,
        },
    )

    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag)

    print(" [.] handle_coap_response() published response: {}".format(response_dict))


def incoming_observation(coap_request, coap_response):
    asyncio.async(handle_coap_notification(coap_request, coap_response))


async def handle_coap_notification(coap_request, coap_response):
    # create response dict:
    response_dict = create_response_dict(coap_request, coap_response)
    message = json.dumps(response_dict)

    # create new channel:
    global amqp_protocol
    amqp_channel = await amqp_protocol.channel()

    await amqp_channel.exchange(AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, AMQP_COAP_NOTIFICATIONS_TOPIC_NAME)

    await amqp_channel.publish(message, exchange_name=AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, routing_key=AMQP_COAP_NOTIFICATIONS_ROUTING_KEY)

    print(" [.] handle_coap_notification() published response: {}".format(response_dict))


async def send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request):
    observation_is_over = asyncio.Future()
    try:
        global coap_client_context
        requester = coap_client_context.request(coap_request)
        requester.observation.register_errback(observation_is_over.set_result)
        requester.observation.register_callback(lambda data, coap_request=coap_request: incoming_observation(coap_request, data))

        try:
            print(" [..] Sending CoAP obs request: {}".format(request_dict))
            coap_response = await requester.response
        except socket.gaierror as  e:
            print("Name resolution error:", e, file=sys.stderr)
            return
        except OSError as e:
            print("Error:", e, file=sys.stderr)
            return

        if coap_response.code.is_successful():
            print(" [..] Received CoAP response: {}".format(coap_response))
            await handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response)
        else:
            print(coap_response.code, file=sys.stderr)
            if coap_response.payload:
                print(coap_response.payload.decode('utf-8'), file=sys.stderr)
            sys.exit(1)

        exit_reason = await observation_is_over
        print("Observation is over: %r"%(exit_reason,), file=sys.stderr)

    finally:
        if not requester.response.done():
            requester.response.cancel()
        if not requester.observation.cancelled:
            requester.observation.cancel()


async def on_rpc_request(amqp_channel, amqp_body, amqp_envelope, amqp_properties):
    print(" [.] on_rpc_request(): received RPC request: {}".format(amqp_body))

    request_dict = {} # hardcoded to vdna.be for SO example
    aiocoap_code = aiocoap.GET
    aiocoap_uri = "coap://vdna.be/obs"
    aiocoap_payload = ""

    # as we are ready to send the CoAP request, ack the client already indicating we have received the RPC request
    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag)

    coap_request = aiocoap.Message(code=aiocoap_code, uri=aiocoap_uri, payload=aiocoap_payload)
    coap_request.opt.observe = 0

    task = asyncio.ensure_future(send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request))
    # we have to keep a hard ref to this task, otherwise the python garbage collector destroyes the task before it is completed. See https://bugs.python.org/issue21163
    # this is apparent from the "Task was destroyed but it is pending" exception thrown after random (lengthy) time intervals, probably the time interval is related to when the gc is triggered
    # await task # this does not seem to work, as it prevents new amqp operations from executing (e.g. amqp channels do not get created)
    # we are actually not interested in waiting for the task anyway, so instead just keep a hard ref to the task in the obs_tasks list
    obs_tasks.append(task) # TODO: when do we remove the task from the list?


async def amqp_connect():
    try:
        (transport, protocol) = await aioamqp.connect('localhost', 5672)
        print(" [x] Connected to AMQP broker")
        return (transport, protocol)
    except aioamqp.AmqpClosedConnection as ex:
        print("closed connections: {}".format(ex))
        raise ex


async def main():
    """Open AMQP connection to broker, subscribe to coap_request_rpc_queue and setup aiocoap client context """

    try:
        global amqp_protocol
        (amqp_transport, amqp_protocol) = await amqp_connect()

        channel = await amqp_protocol.channel()

        await channel.queue_declare(queue_name='coap_request_rpc_queue')
        await channel.basic_qos(prefetch_count=10, prefetch_size=0, connection_global=False)
        await channel.basic_consume(on_rpc_request, queue_name='coap_request_rpc_queue')

        print(" [x] Awaiting CoAP request RPC requests")
    except aioamqp.AmqpClosedConnection as ex:
        print("amqp_connect: closed connections: {}".format(ex))
        exit()

    global coap_client_context
    coap_client_context = await aiocoap.Context.create_client_context()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    loop.set_debug(True)

    asyncio.async(main())
    loop.run_forever()

【问题讨论】:

  • 我在创建一个后台任务以监听 asyncio.Queue 的条目时遇到了类似的(确切的?)问题,经过数小时的调试终于找到了这个错误?这里提到:bugs.python.org/issue21163 最终我的解决方案是创建对 asyncio.Queue 对象的硬引用,正如你提到的,但我对这个问题的混乱程度感到惊讶。

标签: python asynchronous python-asyncio


【解决方案1】:

当一个任务被调度时,它的_step回调被调度在循环中。该回调通过self 维护对任务的引用。我没有检查过代码,但我非常相信循环会维护对其回调的引用。但是,当任务等待某个可等待的或将来的任务时,_step 回调不会被安排。在这种情况下,任务会添加一个 done 回调,该回调保留对任务的引用,但循环不会保留对等待未来的任务的引用。

只要某些东西保留了对任务正在等待的未来的引用,一切都很好。但是,如果没有任何东西保留对未来的硬引用,那么未来可以被垃圾收集,当这种情况发生时,任务可以被垃圾收集。

因此,我会查找您的任务调用的内容,该任务正在等待的未来可能不会被引用。 一般来说,future 需要被引用,以便最终有人可以设置它的结果,所以如果你有未引用的 futures,这很可能是一个错误。

【讨论】:

  • 当我想无限睡眠时,我简单地使用了await asyncio.Future()。对我来说,这看起来不像是一个错误。
猜你喜欢
  • 2012-04-20
  • 2017-12-16
  • 2013-10-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-11
  • 2010-12-27
相关资源
最近更新 更多