【问题标题】:Async rabbitmq consumer with socket connect_ex带有套接字 connect_ex 的异步 rabbitmq 消费者
【发布时间】:2020-06-20 18:33:55
【问题描述】:

我想编写一个 python 程序,从 rabbitmq 服务器获取 ip 和 tcp 端口并扫描以检查端口是否打开,因为这些扫描有时会批量进行(可能 100 个端口,ip 对被添加到队列中一次)我需要异步进行扫描以及时获得所有结果,即使我将超时时间降低到 1 秒,30 个关闭的端口每次都会保持扫描 30 秒! 我尝试了 asyncio 和 aio_pika 来达到我的目标,但扫描仍在同步执行。

import asyncio
import aio_pika
import socket


async def tcp_check(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    await asyncio.sleep(1)
    result = sock.connect_ex((host,port))
    print (str(result))

async def main(loop):
    connection = await aio_pika.connect_robust("amqp://user:password@192.168.1.100/")
    async with connection:
        queue_name = "tcp_scans"
        channel = await connection.channel()
        queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    context = message.body.decode("utf-8").split(',')
                    await tcp_check(context[0], int(context[1]))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

更新:

我也使用了 asyncio.open_connection:

async def tcp_check(host, port):
    con = asyncio.open_connection(host, port, loop=loop)
    try:
        await asyncio.wait_for(con, timeout=1)
        print("{}:{} Connected".format(host, port))
    except asyncio.TimeoutError:
        print ("{}:{} Closeed".format(host, port))

它仍然会从每个项目中逐一测试...

【问题讨论】:

  • 请注意,如果您有一个用async def 定义的函数,例如tcp_check,您几乎肯定想在其中添加await 的东西。如果您没有等待任何内容,则意味着该函数首先不应该是异步的,或者您不小心调用了阻塞 API(例如 connect_ex)。
  • 你能给我一个提示如何解决我的问题
  • Alex 在答案和 cmets 中所说的适用:您需要一个真正异步的 tcp_check 并且您需要将协程传递给 gather 之类的东西,而不是直接在循环中等待它们(它只是按顺序运行它们)。 Google 收集和 aiohttp,有很多如何并行化 http 获取的示例,同样的逻辑也适用于您的案例。

标签: python sockets rabbitmq python-asyncio


【解决方案1】:

应避免在异步协程中调用同步长时间运行的函数。我建议使用 asyncio 替代 connect_ex,例如:

    try:
        await asyncio.open_connection(host, port)
    except Exception as e:
        print(e)

为了“即时”同时执行一些协程,您可以使用create_task,即“将协程包装到任务中并安排其执行”,正如doc 中所写。在此之后,协程将很快执行,例如在下一次awaitasync for 迭代之后,当控制流返回事件循环时。 create_task 返回 Task 对象,您可以将其添加到列表中并等待它们全部使用 asyncio.gather 和标志 return_exceptions=True 完成。 但在你的情况下,我认为将 await tcp_check() 替换为 create_task(tcp_check()) 并在 main() 末尾使用 gather 就足够了,以保证所有 coro 都已完成。

...
asyncio.create_task(tcp_check(context[0], int(context[1])))
...

【讨论】:

  • 我不明白!异步的重点是不要等待长时间运行的任务,例如检查关闭的 tcp 端口。
  • 尝试使用create_task而不是tcp_check的await,然后使用asyncio.gather等待所有任务
  • 你能解释更多吗?据我了解, asyncio.gather 需要同时运行一些任务。如何即时添加这些任务...我的意思是从队列中获取并添加任务
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-20
  • 1970-01-01
  • 2011-04-15
  • 1970-01-01
  • 2018-05-20
  • 1970-01-01
相关资源
最近更新 更多