【发布时间】:2015-07-24 10:31:19
【问题描述】:
设置 - 生产者在主机 A 上运行。创建了数千个协程,每个协程都尝试向主机 B 上的消费者发送消息。
import asyncio
NUM_MSGS = 15000
CONSUMER_IP = "<Host B's IP>"
CONSUMER_PORT = <Port #>
@asyncio.coroutine
def send_msg(i, loop):
conn = asyncio.open_connection(host=CONSUMER_IP, port=CONSUMER_PORT, loop=loop)
reader, writer = yield from conn
writer.write(bytes(i, 'utf-8'))
response = yield from reader.read(100)
print('{0} - {1}'.format(i, response))
writer.close()
@asyncio.coroutine
def msg_controller(loop):
conns = []
for i in range(NUM_MSGS):
conns.append(asyncio.async(send_msg(str(i), loop)))
yield from asyncio.wait(conns)
loop = asyncio.get_event_loop()
loop.run_until_complete(msg_controller(loop))
loop.close()
主机 B 上的消费者只接受一个连接并对其作出响应。
import asyncio
L_PORT = <Port #>
L_HOST = "Host B's IP"
LOAD = 0 #seconds
def handle_connection(transport, data):
print('Processed {0}'.format('.'))
transport.write(pickle.dumps('Done'))
class ConsumerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, e):
if e:
print(e)
def data_received(self, data):
loop.call_later(LOAD, handle_connection, self.transport, data)
loop = asyncio.get_event_loop()
coro = loop.create_server(ConsumerProtocol, L_HOST, L_PORT)
server = loop.run_until_complete(coro)
print('Server running at {0} on port {1}...'.format(L_HOST, L_PORT))
try:
loop.run_forever()
except KeyboardInterrupt:
print('Server stopped')
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
现在,当 NUM_MSGS
Traceback (most recent call last):
File "/usr/lib64/python3.4/asyncio/tasks.py", line 234, in _step
result = coro.throw(exc)
File "aggressive_producer.py", line 15, in send_msg
response = yield from reader.read(100)
File "/usr/lib64/python3.4/asyncio/streams.py", line 452, in read
yield from self._wait_for_data('read')
File "/usr/lib64/python3.4/asyncio/streams.py", line 393, in _wait_for_data
yield from self._waiter
File "/usr/lib64/python3.4/asyncio/futures.py", line 386, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib64/python3.4/asyncio/tasks.py", line 287, in _wakeup
value = future.result()
File "/usr/lib64/python3.4/asyncio/futures.py", line 275, in result
raise self._exception
File "/usr/lib64/python3.4/asyncio/selector_events.py", line 662, in _read_ready
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
Task exception was never retrieved
(请忽略我没有处理异常的事实)
令人惊讶的是,在消费者方面,没有看到任何错误(connection_lost() 没有报告任何异常)。实际上,消费者甚至看不到在生产者端出现上述错误的连接(没有收到connection_made() cb)。然而,上述异常似乎是在等待消费者响应时发生的。
我检查了日志 (/var/log/messages),但也没有发现任何错误。
这是异步限制吗? asyncio的最大并发有这样的限制吗?
操作系统:RHEL 6.5 | python版本:3.4.3
PS - 我增加了 max open fd limit、max tcp syn backlog、可用端口范围,并启用了 tcp_tw_reuse。
【问题讨论】:
标签: python python-3.4 python-asyncio