【问题标题】:Asyncio detecting disconnect hangsAsyncio 检测断开连接挂起
【发布时间】:2014-11-17 03:10:01
【问题描述】:

我在 Python 3.4 中使用 Asyncio,我将尝试解释我目前正在做什么以及我(认为)是什么导致了这个问题。

一方面,我有一个带有阻塞操作的 UDP 连接框架,我从这个流中获取数据并创建我以 SSE 格式传递给客户端的 json。这一切都很好。

我遇到的问题是,如果我不做任何事情并且客户端断开连接,我将无法让它正确处理客户端断开连接,我将开始收到此错误:

WARNING [selector_events:613] socket.send() raised exception.

由于循环仍在运行,我一直在寻找彻底打破循环并触发 .close() 的方法,但我在找到的示例中遇到了问题,并且在线资源并不多。

似乎实际可行的一个示例是尝试从客户端读取一行,如果它是一个空字符串,则意味着客户端已断开连接。

    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break

但是在大​​约 10 条消息之后,发送给客户端的所有消息都停止了,我认为这是因为它挂在“data = (yield from client_reader.readline())”上,如果我关闭客户端然后它会正确关闭并且“结束连接”确实被调用了。任何想法为什么它可能会挂起?我认为我在这一点上对 Asyncio 有一个很好的处理,但这个让我感到困惑。

注意:location() 和 status() 是我从 UDP 套接字获取信息的两个调用——我已经使用相同的代码成功运行它们多个小时而没有出现问题——减去客户端断开线路。

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)


def client_done(task):
    del clients[task]
    client_writer.close()
    log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    yield from postmessage(data,client_writer)
    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break
        data = yield from asyncio.wait_for(location(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(status(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

更新:如果我在“来自 client_reader 的收益”上添加超时,当它达到通常会挂起的程度时,我会收到以下错误。

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "try.py", line 35, in handle_client
    timeout=1.0))
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

这是一个示例脚本,显示了正在运行的错误 - 只需在 python 3.4.2 中运行它,经过 9 次迭代后,它就会挂起从客户端读取。

(脚本已完成,您可以运行它自己查看)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)

    def client_done(task):
        del clients[task]
        client_writer.close()
        log.info("End Connection")

    log.info("New Connection")
    task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
        if not data: #client disconnected
            break

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

@asyncio.coroutine
def test1():
        data = {'result':{
                        'test1':{ }
                    }
                }
        data = json.dumps(data)
        return data

@asyncio.coroutine
def test2():
        data = {'result':{ 
                    'test2':{ }
                    }
                }
        data = json.dumps(data)
        return data 

def main():
    loop = asyncio.get_event_loop()
    f = asyncio.start_server(accept_client, host=None, port=2991)
    loop.run_until_complete(f)
    loop.run_forever()

if __name__ == '__main__':
    log = logging.getLogger("")
    formatter = logging.Formatter("%(asctime)s %(levelname)s " +
                                  "[%(module)s:%(lineno)d] %(message)s")
    # log the things
    log.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    ch.setFormatter(formatter)
    log.addHandler(ch)
    main()

另一个更新:我发现它死了,因为它从客户端的标题中读取所有行,然后在用完行时超时。我认为,我正在寻找的真正答案是当您实际上不需要从客户端接收数据(除了初始连接)时,如何检测客户端断开连接。

【问题讨论】:

  • 客户端脚本是什么样的?或者,如果您实际上不使用脚本,您如何向服务器发送消息?
  • 我其实不需要从客户端读取..只听。
  • 也许我没有问正确的问题...我的意思是,您发布的代码正在侦听端口上的连接。您使用什么来实际连接到该端口并向其发送数据?
  • 目前是浏览器或移动应用程序。但是我实际上并不需要客户端(浏览器或移动应用程序)发送数据只使用它。我从客户端读取只是为了验证连接是否仍然有效。 (正如大多数示例所示是检测客户端断开连接的方法)我已经弄清楚了它在 9 次迭代后死掉的原因。它正在读取标题 - 8 行,然后它超时,因为第 9 行没有任何内容。如果有更好的方法(我知道)来检测客户端是否仍然处于活动状态,我肯定会使用它,这对我来说感觉非常糟糕。
  • 旁注:client_writer.drain 是一个协程,所以你应该调用yield from client_waiter.drain()

标签: python python-3.x python-asyncio


【解决方案1】:

好的,我想我明白你的问题了。您从客户端读取只是为了了解客户端是否已断开连接,但是一旦客户端发送了其标头,readline() 将在客户端仍处于连接状态时无限期阻塞,这会阻止您实际执行任何工作。使用超时来避免阻塞很好,你只需要处理TimeoutError,因为当它发生时你可以假设客户端没有断开:

from concurrent.futures import TimeoutError

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        try:
            # See if client has disconnected.
            data = (yield from asyncio.wait_for(client_reader.readline(),timeout=0.01))
            if not data: # Client disconnected
                break
        except TimeoutError:
            pass  # Client hasn't disconnected.

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

请注意,我在这里将超时设置得很短,因为我们根本不想阻塞,我们只想知道连接是否已关闭。

但更好的解决方案是不显式检查连接是否已关闭,而是处理连接关闭时尝试通过套接字发送数据时遇到的任何异常:

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        try:
            data = yield from asyncio.wait_for(test1(),timeout=1.0)
            yield from postmessage(data,client_writer)

            data = yield from asyncio.wait_for(test2(),timeout=1.0)
            yield from postmessage(data,client_writer)
        except ConnectionResetError:  # And/or whatever other exceptions you see.
            break

【讨论】:

    【解决方案2】:

    我阅读了源代码并找到了一种简单的方法:

    if client_writer.transport._conn_lost:
        print('Connection is lost')
        # break some loop
    

    【讨论】:

      【解决方案3】:

      select() 遇到了同样的问题,它在等待事件时也无法检测到死套接字。

      我使用非阻塞 read() 解决了我的问题,并考虑到来自 select() 的读取事件产生,映射到 0 收到的字节数是死连接。

      在内核模式(系统调用)中执行代码时,套接字的死亡不会产生信号,也不会产生任何异常。考虑到从零字节的 read() 产生的收益,您似乎只能使用启发式方法,应该将其视为死链接/套接字...不是我认为的优雅,而是我还没有找到更好的东西(暂时)。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-07-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-12-10
        • 2020-07-07
        • 2013-02-10
        相关资源
        最近更新 更多