您可以使用 Airflow 中的 SimpleHttpOperator 和 HttpSensor 来实现您所描述的内容(无需安装任何额外的软件包)。
考虑这个使用 http_default 连接到http bin 的示例。
执行 POST 请求的任务:
task_post_op = SimpleHttpOperator(
task_id='post_op',
# http_conn_id='your_conn_id',
endpoint='post',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()['json']['priority'] == 5,
response_filter=lambda response: 'get', # e.g lambda response: json.loads(response.text)
dag=dag,
)
通过提供response_filter,您可以操纵响应结果,这将是推送到XCom 的值。在您的情况下,您应该在下一个任务中返回要轮询的端点。
response_filter:允许您操作响应的函数
文本。例如 response_filter=lambda 响应:json.loads(response.text)。
可调用对象将响应对象作为第一个位置参数
以及可选的上下文字典中可用的任意数量的关键字参数。
:type response_filter: 一个 lambda 或定义的函数。
请注意,response_check 参数是可选的。
执行 GET 请求的任务:
使用HttpSensor 戳直到response_check 可调用的计算结果为真。
task_http_sensor_check = HttpSensor(
task_id='http_sensor_check',
# http_conn_id='your_conn_id',
endpoint=task_post_op.output,
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
作为endpoint 参数,我们使用XComArg 传递从先前任务中提取的XCom 值。
使用poke_interval 定义作业应在每次尝试之间等待的时间(以秒为单位)。
记得创建一个您自己的Connection,定义基本 URL、端口等。
如果这对你有用,请告诉我!