【发布时间】:2021-10-13 03:13:22
【问题描述】:
我在气流 dag 中有两个任务。第一个任务点击 POST URL,它的结果成为第二个任务的参数。我可以点击第一个 URL 并将其发送到另一个任务。问题是,我无法从第二个创建正确的 json。
def create_dag(dag_id,
schedule,
default_args):
def task1(**kwargs):
res = requests.post('some url')
return res
def task2(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key=None, task_ids='taks1')
for rec in v1:
decode = rec.decode("utf-8")
logger.error(decode)
第一个任务返回json:
{
"data": [
{
"id": 36,
"idPercent": 12.67605633802817,
"idPerson": [
"Washburn"
]
},
{
"id": 37,
"idPercent": 13.028169014084508,
"idPerson": [
"Nicole"
]
}
]
}
rec.decode("utf-8") 打印出来的也是一样的。如果我在 rec.decode("utf-8") 上执行 json.loads,我就会开始出错。我想创建正确的 json,然后为数据的每个数组值发送一个 POST 请求。
我想要类似的东西:
for rec in v1['data']:
requests.post(url, rec)
但我无法创建 json 并从中提取数据。
【问题讨论】:
-
您是否尝试过从第一个操作员返回 JSON 转储并从第二个任务返回 JSON 加载?基本上尝试通过 XCOM 发送序列化对象。
标签: python-3.x airflow