【发布时间】:2017-08-14 10:44:40
【问题描述】:
我正在 RPi 上构建一个客户端服务器应用程序。它有一个主线程,它创建一个通信线程来与 iOS 设备通信。 主线程创建一个 asyncio 事件循环和一个 sendQ 和一个 recvQ,并将它们作为 args 传递给 comms 线程中的 commsDelegate 主方法。
我遇到的问题是当 iOS 设备连接时,它需要在数据可用时立即从这个 Python 应用程序接收未经请求的数据,并且它需要能够将数据发送到 Python 应用程序。所以发送和接收需要是非阻塞的。
那里有很棒的回声服务器教程。但是在服务器对数据做一些有用的事情方面几乎没有。
谁能帮助我让 asyncio 在主线程排队后立即读取我的发送队列并转发数据?我收到了很好的工作。
主线程创建一个循环并启动通讯线程:
commsLoop = asyncio.new_event_loop()
commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
commsMainThread.start()
然后模块 CommsDelegate 中的 asyncio 应该将循环作为 loop.run_forever() 服务器任务从套接字流读取和写入,并使用队列将接收消息发送回主线程。
到目前为止,这是我的代码。我发现如果我为协议生成器创建一个工厂,我可以将队列名称传递给它,这样现在消息的接收就很好了。当它们从客户端到达时,它们会排队 _nowait 并且主线程会很好地接收它们。
我只需要 asyncio 来处理来自主线程的出站消息队列,因为它们到达 sendQ,因此它可以将它们发送到连接的客户端。
#!/usr/bin/env python3.6
import asyncio
class ServerProtocol(asyncio.Protocol):
def __init__(self, loop, recvQ, sendQ):
self.loop = loop
self.recvQ = recvQ
self.sendQ = sendQ
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
self.recvQ.put_nowait(message.rstrip())
# Needs work... I think the queue.get_nowait should be a co-ro maybe?
def unknownAtTheMo():
dataToSend = sendQ.get_nowait()
print('Send: {!r}'.format(message))
self.transport.write(dataToSend)
# Needs work to close on request from client or server or exc...
def handleCloseSocket(self):
print('Close the client socket')
self.transport.close()
# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
print("In consume coro")
while True:
outboundData = await self.sendQ.get()
print("Consumed", outboundData)
self.transport.write(outboundData.encode('ascii'))
def commsDelegate(recvQ, sendQ, loop, port):
asyncio.set_event_loop(loop)
# Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
# Each client connection will create a new protocol instance
connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))
# Outgoing message queue handler
consumer = asyncio.ensure_future(consume(sendQ))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the connection
connection.close()
# Close the loop
loop.close()
我以 json 格式发送所有数据消息,CommsDelegate 执行编码和解码,然后按原样中继它们。
更新:异步线程似乎对传入流量运行良好。服务器接收 json 并通过队列中继它 - 非阻塞。
一旦发送工作,我将在线程上拥有一个可重用的黑盒服务器。
【问题讨论】:
-
FWIW。尽管使用 CFStreams,但我的 iOS 应用程序正在发挥作用。我想我应该转移到 URLSession-Streamtask ,但现在记录得很糟糕。 Apple 文档都是关于命令 -> 响应的,而不是关于处理不请自来的消息。
-
我应该说 Python 不喜欢 get_event_loop(loop)。
-
上面的代码有效吗?如果没有,请添加回溯
-
是的,对于来自通信客户端的消息。它们被放置在 recvQ 上,我的主线程执行 recvQ.get_nowait() 导致 json 出来供主线程处理。
-
我想我需要一个: async def 'q-consumer' 方法作为@coroutine,但我还不太确定它在哪里插入。
标签: multithreading sockets queue nonblocking python-asyncio