【发布时间】:2020-12-28 10:45:54
【问题描述】:
我对来自Airflow 的KubernetesPodOperator 感到困惑,我想知道如何传递load_users_into_table() 函数,它有一个 conn_id 参数存储在connection 的Airflow 中吊舱?
在官方文档中建议将conn_id 放在Secret 中,但我不明白之后如何在我的函数load_users_into_table() 中传递它。
https://airflow.apache.org/docs/stable/kubernetes.html
要在 pod 中执行的函数(任务):
def load_users_into_table(postgres_hook, schema, path):
gdf = read_csv(path)
gdf.to_sql('users', con=postgres_hook.get_sqlalchemy_engine(), schema=schema)
dag:
_pg_hook = PostgresHook(postgres_conn_id = _conn_id)
with dag:
test = KubernetesPodOperator(
namespace=namespace,
image=image_name,
cmds=["python", "-c"],
arguments=[load_users_into_table],
labels={"dag-id": dag.dag_id},
name="airflow-test-pod",
task_id="task-1",
is_delete_operator_pod=True,
in_cluster=in_cluster,
get_logs=True,
config_file=config_file,
executor_config={
"KubernetesExecutor": {"request_memory": "512Mi",
"limit_memory": "1024Mi",
"request_cpu": "1",
"limit_cpu": "2"}
}
)
【问题讨论】:
-
KubernetesPodOperator 将运行内部镜像。我认为你有两个选择。 1. 实现
load_users_into_table并构建 docker 镜像,然后使用 KubernetesPodOperator 运行它 2. 使用 PythonOperator 运行代码
标签: kubernetes kubernetes-pod airflow