【发布时间】:2020-04-20 21:44:57
【问题描述】:
我在 Apache Airflow 中编写了一个 DockerOperator,我想给它一个卷。到目前为止,一切都很好。这是一个例子:
t = DockerOperator(
task_id='test',
image='testimage:latest',
command='python3 /code/test.py',
volumes=["/mnt/interim:/interim"],
xcom_push=True,
dag=dag,
)
我遇到的问题如下:
挂载目录的名称需要灵活。因此,我想挂载一个名称中带有 run_id 的目录。
volumes=["/mnt/interim/" + "{{ run_id }}" + ":/interim"]
然而,Airflow 似乎无法解析卷中的“{{ run_id }}”,而只能在 DockerOperator 的命令中解析。
简而言之,我想获取 run_id 以便挂载它。
请注意,使用气流变量(气流的环境变量)不会起作用,因为如果任务并行运行,此变量可能会被覆盖。
也许你们中的某些人已经知道可以做到这一点的高级 DockerOperator (CustomOperator)。
提前致谢:)
【问题讨论】:
-
你能粘贴完整的 dag 吗?此外 {{run_id}} 只会在任务中初始化。如果您在调用任何任务之前尝试使用 run_id,它将不会初始化
-
@JyotiArora 你到底需要 DAG 什么,它由几个任务组成,例如 DockerOperator,最后所有任务都连续执行:t1 >> t2 >> t3 >> t4 >> t5 >> t6 >> t7。 DAG 变量如下所示:dag = DAG('lai', default_args=default_args, description='LAI DAG', schedule_interval=None, on_failure_callback=failure, on_success_callback=success, )
-
你试过这个:volumes=["/mnt/interim/{{ run_id }}:/interim"] 来自气流文档:```also_run_this = BashOperator(task_id='also_run_this', bash_command ='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag, ) ``
-
@JyotiArora 我尝试了第一个,它与上面解释的基本相同,第二个对我来说没有意义,因为据我所知,bash 运算符的输出并未全局保存但仅限于 xcom 值。同样,为了获得 xcom 值,您使用括号 {{}} 没有成功。
-
apache 气流通道松弛。你可以加入并提出这个问题。在那里你会得到快速响应
标签: python docker airflow mount