【问题标题】:Airflow - How to handle Asynchronous API calls?Airflow - 如何处理异步 API 调用?
【发布时间】:2021-08-28 23:26:25
【问题描述】:

我正在尝试找出解决以下问题的最佳方法。本质上,我有一个外部 API 服务,我正在向它发送请求并获取结果。

POST = 发送请求,您得到的响应是一个 URL,您可以将其用于 GET 请求以检索您的结果。

GET = 从 POST 请求中轮询返回的 URL,直到获得成功的结果。

在气流中解决此问题的最佳方法是什么?我的想法是基本上让 2 个任务并行运行。

  1. 发送 POST 请求,然后将响应 URL 保存到 XCOM。
  2. 另一个将在一个while循环中持续运行,从 XCOM 存储中读取新的 URL 响应并获得响应。一旦从该 URL 成功检索到结果,它就会从 XCOM 商店中删除。

你认为这是正确的做法吗?或者我应该在 python 中使用 asyncio 库吗?

非常感谢任何帮助

谢谢,

【问题讨论】:

    标签: airflow


    【解决方案1】:

    您可以使用 Airflow 中的 SimpleHttpOperatorHttpSensor 来实现您所描述的内容(无需安装任何额外的软件包)。

    考虑这个使用 http_default 连接到http bin 的示例。

    执行 POST 请求的任务:

    task_post_op = SimpleHttpOperator(
        task_id='post_op',
        # http_conn_id='your_conn_id',
        endpoint='post',
        data=json.dumps({"priority": 5}),
        headers={"Content-Type": "application/json"},
        response_check=lambda response: response.json()['json']['priority'] == 5,
        response_filter=lambda response: 'get', # e.g  lambda response: json.loads(response.text)
        dag=dag, 
    )
    
    

    通过提供response_filter,您可以操纵响应结果,这将是推送到XCom 的值。在您的情况下,您应该在下一个任务中返回要轮询的端点。

    response_filter:允许您操作响应的函数 文本。例如 response_filter=lambda 响应:json.loads(response.text)。 可调用对象将响应对象作为第一个位置参数 以及可选的上下文字典中可用的任意数量的关键字参数。 :type response_filter: 一个 lambda 或定义的函数。

    请注意,response_check 参数是可选的。

    执行 GET 请求的任务:

    使用HttpSensor 戳直到response_check 可调用的计算结果为真。

    task_http_sensor_check = HttpSensor(
        task_id='http_sensor_check',
        # http_conn_id='your_conn_id',
        endpoint=task_post_op.output, 
        request_params={},
        response_check=lambda response: "httpbin" in response.text,
        poke_interval=5,
        dag=dag,
    )
    

    作为endpoint 参数,我们使用XComArg 传递从先前任务中提取的XCom 值。 使用poke_interval 定义作业应在每次尝试之间等待的时间(以秒为单位)。

    记得创建一个您自己的Connection,定义基本 URL、端口等。

    如果这对你有用,请告诉我!

    【讨论】:

    • 嗨 NicoE。非常感谢您的回复。该解决方案能否同时处理多个 POST/GET 请求?即:如果我需要同时发送 10 个 HTTPBin 请求并不断“戳”每个 HttpSensor,我将如何处理这些运算符?此外,SimpleHttpOperator 是否可以与 HTTPS API 调用一起使用?再次感谢,
    • @adan11 为了并行处理多个此类请求周期,只需遍历列表并动态创建所需数量的任务。在TaskGroup 中应用该模式/方法,以保持 UI 整洁。查看this answer 了解如何实现它的详细信息。
    • @adan11 SimpleHttpOperator 在底层使用 Python requests 模块,因此 HTTPS 没有问题。试一试,如果它适合你,请记得将答案标记为accepted
    • 感谢 NicoE。最终目标是让 100,000 + API 调用异步运行。你认为如果我为每个 API 调用使用一个任务动态创建它,它会变得有点混乱吗?我正在考虑采用一种在一个任务中使用请求库的方法,循环遍历我的 100,000 个 API 调用并将端点保存到列表中。然后在同一个任务中,一旦 POST API 调用完成,遍历列表并从每个端点 GET 请求中获取响应,如果成功找到则从列表中删除。你认为这种方法也行得通吗? @NicoE
    • 虽然我不确定是否将保存的响应保存在同一任务的列表中,或者通过 Xcom 将它们发送到另一个任务。我将它放在同一个任务中的理由是保持 Tasks 的原子性。
    猜你喜欢
    • 2018-08-05
    • 1970-01-01
    • 2021-06-18
    • 1970-01-01
    • 1970-01-01
    • 2011-09-18
    • 1970-01-01
    • 1970-01-01
    • 2018-12-09
    相关资源
    最近更新 更多