【问题标题】:Python 3.4 Comms Stream Delegate - Non-blocking recv and send - data out from asyncioPython 3.4 Comms Stream Delegate - 非阻塞接收和发送 - 从异步输出数据
【发布时间】:2017-08-14 10:44:40
【问题描述】:

我正在 RPi 上构建一个客户端服务器应用程序。它有一个主线程,它创建一个通信线程来与 iOS 设备通信。 主线程创建一个 asyncio 事件循环和一个 sendQ 和一个 recvQ,并将它们作为 args 传递给 comms 线程中的 commsDelegate 主方法。

我遇到的问题是当 iOS 设备连接时,它需要在数据可用时立即从这个 Python 应用程序接收未经请求的数据,并且它需要能够将数据发送到 Python 应用程序。所以发送和接收需要是非阻塞的。

那里有很棒的回声服务器教程。但是在服务器对数据做一些有用的事情方面几乎没有。

谁能帮助我让 asyncio 在主线程排队后立即读取我的发送队列并转发数据?我收到了很好的工作。

主线程创建一个循环并启动通讯线程:

  commsLoop = asyncio.new_event_loop()
  commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
  commsMainThread.start()

然后模块 CommsDelegate 中的 asyncio 应该将循环作为 loop.run_forever() 服务器任务从套接字流读取和写入,并使用队列将接收消息发送回主线程。

到目前为止,这是我的代码。我发现如果我为协议生成器创建一个工厂,我可以将队列名称传递给它,这样现在消息的接收就很好了。当它们从客户端到达时,它们会排队 _nowait 并且主线程会很好地接收它们。

我只需要 asyncio 来处理来自主线程的出站消息队列,因为它们到达 sendQ,因此它可以将它们发送到连接的客户端。

#!/usr/bin/env python3.6
import asyncio

class ServerProtocol(asyncio.Protocol):

  def __init__(self, loop, recvQ, sendQ):
    self.loop = loop
    self.recvQ = recvQ
    self.sendQ = sendQ

  def connection_made(self, transport):
    peername = transport.get_extra_info('peername')
    print('Connection from {}'.format(peername))
    self.transport = transport

  def data_received(self, data):
    message = data.decode()
    print('Data received: {!r}'.format(message))
    self.recvQ.put_nowait(message.rstrip())

  # Needs work... I think the queue.get_nowait should be a co-ro maybe? 
  def unknownAtTheMo():
    dataToSend = sendQ.get_nowait()
    print('Send: {!r}'.format(message))
    self.transport.write(dataToSend)

  # Needs work to close on request from client or server or exc...
  def handleCloseSocket(self):
    print('Close the client socket')
    self.transport.close()

# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
  print("In consume coro")
  while True:
    outboundData = await self.sendQ.get()
    print("Consumed", outboundData)
    self.transport.write(outboundData.encode('ascii'))

def commsDelegate(recvQ, sendQ, loop, port):
  asyncio.set_event_loop(loop)

  # Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
  factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
  # Each client connection will create a new protocol instance
  connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))

  # Outgoing message queue handler
  consumer = asyncio.ensure_future(consume(sendQ))

  # Set up connection
  loop.run_until_complete(connection)

  # Wait until the connection is closed
  loop.run_forever()

  # Wait until the queue is empty
  loop.run_until_complete(queue.join())

  # Cancel the consumer
  consumer.cancel()

  # Let the consumer terminate
  loop.run_until_complete(consumer)

  # Close the connection
  connection.close()

  # Close the loop
  loop.close()

我以 json 格式发送所有数据消息,CommsDelegate 执行编码和解码,然后按原样中继它们。

更新:异步线程似乎对传入流量运行良好。服务器接收 json 并通过队列中继它 - 非阻塞。

一旦发送工作,我将在线程上拥有一个可重用的黑盒服务器。

【问题讨论】:

  • FWIW。尽管使用 CFStreams,但我的 iOS 应用程序正在发挥作用。我想我应该转移到 URLSession-Streamtask ,但现在记录得很糟糕。 Apple 文档都是关于命令 -> 响应的,而不是关于处理不请自来的消息。
  • 我应该说 Python 不喜欢 get_event_loop(loop)。
  • 上面的代码有效吗?如果没有,请添加回溯
  • 是的,对于来自通信客户端的消息。它们被放置在 recvQ 上,我的主线程执行 recvQ.get_nowait() 导致 json 出来供主线程处理。
  • 我想我需要一个: async def 'q-consumer' 方法作为@coroutine,但我还不太确定它在哪里插入。

标签: multithreading sockets queue nonblocking python-asyncio


【解决方案1】:

我可以看到您的方法存在两个问题。首先,您所有的客户端都使用相同的recvsend 队列,所以consume 协程无法知道该回复谁。

第二个问题与您使用队列作为同步和异步世界之间的桥梁有关。请参阅这部分代码:

await self.sendQ.get()

如果sendQ 是常规队列(来自queue 模块),则此行将失败,因为sendQ 不是协程。另一方面,如果sendQasyncio.Queue,则主线程将无法使用sendQ.put,因为它是协程。可以使用put_nowait,但在 asyncio 中不能保证线程安全。相反,您必须使用 loop.call_soon_threadsafe:

loop.call_soon_threadsafe(sendQ.put_nowait, message)

一般来说,请记住 asyncio 旨在作为主应用程序运行。它应该在主线程中运行,并通过ThreadPoolExecutor 与同步代码进行通信(参见loop.run_in_executor)。

asyncio documentation 中有关多线程的更多信息。您可能还想看看asyncio stream API,它提供了一个更好的接口来使用 TCP。

【讨论】:

  • 感谢文森特。澄清了我的一些误解。
  • 非常感谢您的协助。我是 python 新手,这真的消除了我的误解。进入队列的数据是我最不担心的 - 我可以标记数据 - ;-)。我发现令人惊讶的是,有多少代码示例被过度简化以至于使它们变得毫无意义。例如。 Echo 服务器 - 真的吗?有趣的是,顺便说一句,如果我在 recvQ.put() 之后对其进行编码,sendQ.get() 确实可以工作,但这意味着它仅在数据到达入站时处理出站数据。我会尝试 Stream 方法——我必须在 iOS 端这样做,所以它应该可以正常工作。谢谢...继续
  • 我接受了 Vincent 的建议,并将 while shebang 转移到了 Asyncio 协程领域。不再需要队列或线程或线程安全考虑。我宁愿拥有一个可以轻松反复部署的黑盒服务器,但我确实有可以复制的样板代码。一些需要注意的粗糙边缘,但很容易分类。干杯
猜你喜欢
  • 2012-08-24
  • 1970-01-01
  • 2021-12-30
  • 1970-01-01
  • 2020-06-20
  • 1970-01-01
  • 2013-03-07
  • 1970-01-01
  • 2013-07-19
相关资源
最近更新 更多