【发布时间】:2020-06-20 18:33:55
【问题描述】:
我想编写一个 python 程序,从 rabbitmq 服务器获取 ip 和 tcp 端口并扫描以检查端口是否打开,因为这些扫描有时会批量进行(可能 100 个端口,ip 对被添加到队列中一次)我需要异步进行扫描以及时获得所有结果,即使我将超时时间降低到 1 秒,30 个关闭的端口每次都会保持扫描 30 秒! 我尝试了 asyncio 和 aio_pika 来达到我的目标,但扫描仍在同步执行。
import asyncio
import aio_pika
import socket
async def tcp_check(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
await asyncio.sleep(1)
result = sock.connect_ex((host,port))
print (str(result))
async def main(loop):
connection = await aio_pika.connect_robust("amqp://user:password@192.168.1.100/")
async with connection:
queue_name = "tcp_scans"
channel = await connection.channel()
queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
context = message.body.decode("utf-8").split(',')
await tcp_check(context[0], int(context[1]))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
更新:
我也使用了 asyncio.open_connection:
async def tcp_check(host, port):
con = asyncio.open_connection(host, port, loop=loop)
try:
await asyncio.wait_for(con, timeout=1)
print("{}:{} Connected".format(host, port))
except asyncio.TimeoutError:
print ("{}:{} Closeed".format(host, port))
它仍然会从每个项目中逐一测试...
【问题讨论】:
-
请注意,如果您有一个用
async def定义的函数,例如tcp_check,您几乎肯定想在其中添加await的东西。如果您没有等待任何内容,则意味着该函数首先不应该是异步的,或者您不小心调用了阻塞 API(例如connect_ex)。 -
你能给我一个提示如何解决我的问题
-
Alex 在答案和 cmets 中所说的适用:您需要一个真正异步的
tcp_check并且您需要将协程传递给gather之类的东西,而不是直接在循环中等待它们(它只是按顺序运行它们)。 Google 收集和 aiohttp,有很多如何并行化 http 获取的示例,同样的逻辑也适用于您的案例。
标签: python sockets rabbitmq python-asyncio