【问题标题】:Airflow xcom return a string in list format instead of just a string value?Airflow xcom 以列表格式返回一个字符串,而不仅仅是一个字符串值?
【发布时间】:2021-08-17 17:00:58
【问题描述】:

我有一个气流操作符,它返回一个字符串值,任务名为“task1”。所以执行后我进入 xcom 并检查 return_value 和它只是一个字符串(下面的屏幕截图)。

xcom key/value screenshot

然后我有一个名为 task2 的运算符从 task1 的 xcom 值中获取输入,如下所示:

"{{ ti.xcom_pull(task_ids=['task1'],key='return_value')}}"

问题是它得到的值是一个转换为字符串的列表。

xcom 中的值:这只是一个字符串

xcom pull(jinga 模板版本)返回的值:['this is just a string']

那么有没有办法可以更新上面显示的 xcom pull(jinga 版本)来拉取值?我无法在运算符内部访问它被传递的内容,或者我可以放置一些逻辑将字符串转换为列表,然后仅获取值(但这并不理想,也不是一个选项)。

另外,我认为它的工作提到我尝试做类似的事情,但使用 Python 运算符然后使用 python 代码中的内容执行 xcom pull 并且返回值很好。所以我不确定为什么 xcom pull 使用 Jinja 模板会这样做,我该如何解决这个问题?我希望我可以做一些我不知道的事情来轻松获得我想要的输出。工作的python操作符代码如下(仅供参考...)

def python_code_task3(**context): 
value = context['ti'].xcom_pull(task_ids='task1', key='return_value') 
logging.info("Value: " + value)

这段代码只是输出我想要的值这只是一个字符串

我真的只想使用jinga模板版本并让它检索并传入字符串。不是将字符串值作为列表中的一项的列表的字符串表示形式。

【问题讨论】:

    标签: airflow


    【解决方案1】:

    您在代码 sn-ps 中提取 XCom 的两种方式略有不同:一种是 task_ids=["task_1"](一个列表 arg),另一种是 task_ids="task_1"(一个 str arg)。

    task_ids 的参数类型在使用 xcom_pull() 时很重要。 Airflow 将推断,如果您传递一个任务 ID 列表,则应该有多个任务可以从中提取XComs,并将返回一个包含所有检索到的XComs 的列表。否则,如果类型只是一个字符串,也就是单个任务 ID,则返回单个 XCom 值。这是完成此操作的link to the code

    还值得注意的是,Jinja 模板值默认呈现为字符串。但是,使用 Airflow 2.1,您可以在 DAG 级别将名为 render_template_as_native_obj 的参数设置为 True。这将在适用时将 Jinja 模板化的值呈现为本机 Python 对象(列表、字典等)。有关此概念的更多信息here

    【讨论】:

    • 就是这样。我忽略了这一点!更新为“{{ ti.xcom_pull(task_ids='task1',key='return_value')}}”,现在可以正常工作了!
    猜你喜欢
    • 2021-03-01
    • 1970-01-01
    • 2023-03-21
    • 2013-08-12
    • 1970-01-01
    • 2016-08-23
    • 2019-11-29
    • 2015-11-29
    • 1970-01-01
    相关资源
    最近更新 更多