【问题标题】:Create json from xcom airflow task instance data从 xcom 气流任务实例数据创建 json
【发布时间】:2021-10-13 03:13:22
【问题描述】:

我在气流 dag 中有两个任务。第一个任务点击 POST URL,它的结果成为第二个任务的参数。我可以点击第一个 URL 并将其发送到另一个任务。问题是,我无法从第二个创建正确的 json。


def create_dag(dag_id,
               schedule,
               default_args):
    def task1(**kwargs):
        res = requests.post('some url')
        return res

    def task2(**kwargs):
        ti = kwargs['ti']
        v1 = ti.xcom_pull(key=None, task_ids='taks1')
        for rec in v1:
            decode = rec.decode("utf-8")
            logger.error(decode)

第一个任务返回json:

{
    "data": [
        {
            "id": 36,
            "idPercent": 12.67605633802817,
            "idPerson": [
                "Washburn"
            ]
        },
        {
            "id": 37,
            "idPercent": 13.028169014084508,
            "idPerson": [
                "Nicole"
            ]
        }
    ]
}

rec.decode("utf-8") 打印出来的也是一样的。如果我在 rec.decode("utf-8") 上执行 json.loads,我就会开始出错。我想创建正确的 json,然后为数据的每个数组值发送一个 POST 请求。

我想要类似的东西:

for rec in v1['data']:
     requests.post(url, rec)

但我无法创建 json 并从中提取数据。

【问题讨论】:

  • 您是否尝试过从第一个操作员返回 JSON 转储并从第二个任务返回 JSON 加载?基本上尝试通过 XCOM 发送序列化对象。

标签: python-3.x airflow


【解决方案1】:

在 Airflow 中,xcom 作为字符串传递,在您的情况下,v1 将是一个字符串,即使它是从 xcom 对象加载的。解决此问题的一种方法是使用 ast 模块,该模块将字符串值转换为其“正确”类型。请注意,有时由于标点符号或类似的原因,它很难解释正确的类型。

import ast

v1 = ast.literal_eval(ti.xcom_pull(key=None, task_ids='taks1'))

【讨论】:

  • 请添加一些上下文来解释您的答案,以便我们学习;否则它将因缺少它而被否决并删除。
  • 它不适用于我的 json 字符串。即使 json 是正确的,它也会显示格式错误的字符串。
  • @Nikita 正如我在回答ast.literal_eval() 中提到的那样,由于某些字符,有时很难将字符串正确转换为 JSON。我的建议是检查您的字符串中是否有 /、\、:、' 或 # 等字符,并在调用 ast.literal_eval() 之前查看您是否可以 replace() 这些字符。
  • import json 然后json.loads(ti.xcom_pull(key=None, task_ids='taks1'))
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-16
  • 1970-01-01
  • 1970-01-01
  • 2019-08-16
  • 1970-01-01
  • 2021-06-12
相关资源
最近更新 更多