【问题标题】:Airflow KubernetesPodOperator times out on local MicroK8sAirflow KubernetesPodOperator 在本地 MicroK8 上超时
【发布时间】:2021-06-15 01:41:50
【问题描述】:

我正在尝试使用 KubernetesPodOperator 启动一个测试 Pod。作为镜像,我使用了 Docker 中的 hello-world 示例,我将其推送到了我的 MicroK8s 安装的本地注册表。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.pod import Port
from airflow.utils.dates import days_ago
from datetime import timedelta

ports = [Port('http', 80)]

default_args = {
    'owner': 'user',
    'start_date': days_ago(5),
    'email': ['user@mail'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

workflow = DAG(
    'kubernetes_helloworld',
    default_args=default_args,
    description='Our first DAG',
    schedule_interval=None,
)

op = DummyOperator(task_id='dummy', dag=workflow)

t1 = KubernetesPodOperator(
    dag=workflow,
    namespace='default',
    image='localhost:32000/hello-world:registry',
    name='pod2',
    task_id='pod2',
    is_delete_operator_pod=True,
    hostnetwork=False,
    get_logs=True,
    do_xcom_push=False,
    in_cluster=False,
    ports=ports,
    )

op >> t1

当我触发 DAG 时,它会继续运行并无限期地重新尝试启动 pod。 这是我在 Airflow 中得到的日志输出:

Reading local file: /home/user/airflow/logs/kubernetes_helloworld/pod2/2021-03-17T16:25:11.142695+00:00/4.log
[2021-03-17 16:30:00,315] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:1042} INFO - 
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,320] {taskinstance.py:1043} INFO - Starting attempt 4 of 1
[2021-03-17 16:30:00,320] {taskinstance.py:1044} INFO - 
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,330] {taskinstance.py:1063} INFO - Executing <Task(KubernetesPodOperator): pod2> on 2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:30:00,332] {standard_task_runner.py:52} INFO - Started process 9021 to run task
[2021-03-17 16:30:00,335] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'kubernetes_helloworld', 'pod2', '2021-03-17T16:25:11.142695+00:00', '--job-id', '57', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/kubernetes_helloworld.py', '--cfg-path', '/tmp/tmp5ss4g6q4', '--error-file', '/tmp/tmp9t3l8emt']
[2021-03-17 16:30:00,336] {standard_task_runner.py:77} INFO - Job 57: Subtask pod2
[2021-03-17 16:30:00,357] {logging_mixin.py:104} INFO - Running <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [running]> on host 05nclorenzvm01.internal.cloudapp.net
[2021-03-17 16:30:00,369] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=user
AIRFLOW_CTX_DAG_OWNER=user
AIRFLOW_CTX_DAG_ID=kubernetes_helloworld
AIRFLOW_CTX_TASK_ID=pod2
AIRFLOW_CTX_EXECUTION_DATE=2021-03-17T16:25:11.142695+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:32:09,805] {connectionpool.py:751} WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f812fc23eb0>: Failed to establish a new connection: [Errno 110] Connection timed out')': /api/v1/namespaces/default/pods?labelSelector=dag_id%3Dkubernetes_helloworld%2Cexecution_date%3D2021-03-17T162511.1426950000-e549b02ea%2Ctask_id%3Dpod2

当我在没有 Airflow 的情况下在 kubernetes 中启动 pod 时,它运行良好。 我做错了什么?

我尝试了以下方法:

  • 使用睡眠命令防止容器退出
  • 尝试不同的图像,例如 pyspark
  • 重新安装 Airflow 和 MicroK8s

气流 v2.0.1 MicroK8s v1.3.7 蟒蛇 3.8 Ubuntu 18.04 LTS

【问题讨论】:

  • 你激活 microk8s dns 插件了吗?
  • 哪个气流版本?你用天文+气流吗?
  • 我没有启用 dns 插件...有必要吗?我也没有使用天文学家。所有版本都在我的帖子末尾。
  • dns 部署 CoreDNS 为 Kubernetes 提供地址解析服务。其他插件通常需要此服务,因此建议您启用它。 microk8s.io/docs/addon-dns
  • @Lorenz 你能把你用来启动本地 Airflow 的 docker-compose 文件分享给 KubernetesExecutor 吗?

标签: python docker kubernetes airflow microk8s


【解决方案1】:

很遗憾,我还没有弄清楚 microK8s 的问题。

但是我可以在 Airflow 中使用 KubernetesPodOperator 和 minikube。 以下代码能够毫无问题地运行:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'user',
    'start_date': days_ago(5),
    'email': ['user@airflow.de'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

namespace = conf.get('kubernetes', 'NAMESPACE')

if namespace =='default':
    config_file = '/home/user/.kube/config'
    in_cluster=False
else:
    in_cluster=True
    config_file=None

dag = DAG('example_kubernetes_pod',
          schedule_interval='@once',
          default_args=default_args)

with dag:
    k = KubernetesPodOperator(
        namespace=namespace,
        image="hello-world",
        labels={"foo": "bar"},
        name="airflow-test-pod",
        task_id="task-one",
        in_cluster=in_cluster, # if set to true, will look in the cluster, if false, looks for file
        cluster_context='minikube', # is ignored when in_cluster is set to True
        config_file=config_file,
        is_delete_operator_pod=True,
        get_logs=True)

【讨论】:

    【解决方案2】:

    为了回答您的问题,我想您是在没有 VM 的本地 microk8s 集群上运行该任务。

    气流可能无法连接到 K8s 控制平面来触发吊舱。 添加cluster_context="microk8s"

    t1 = KubernetesPodOperator(
             dag=workflow,
             namespace='default',
             image='localhost:32000/hello-world:registry',
             name='pod2',
             task_id='pod2',
             is_delete_operator_pod=True,
             get_logs=True,
             do_xcom_push=False,
             in_cluster=False,
             cluster_context='microk8s',
             config_file='/path/to/config',
             ports=ports,
         )
    

    要查看使用的集群上下文,请键入以下命令并将输出重定向到配置文件(在 Airflow 项目中):

    microk8s.kubectl config view --flatten > config
    

    输出:

    apiVersion: v1
    clusters:
    - cluster:
        certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0.............
        server: https://127.0.0.1:16443
      name: microk8s-cluster
    contexts:
    - context:
        cluster: microk8s-cluster
        user: admin
      name: microk8s
    current-context: microk8s
    kind: Config
    preferences: {}
    users:
    - name: admin
      user:
         token: SldHNFQ3ek9yUGh4TVhWN......................................
    

    【讨论】:

    • 感谢您的回答阿迪尔。我只是尝试添加了 cluster_context 属性,但不幸的是它没有任何区别。我仍然收到“无法建立新连接:[Errno 110] 连接超时”异常。很明显,Airflow 无法联系到 microK8s。你有其他想法来解决这个问题吗?
    • 我遇到了与这篇文章相同的错误:stackoverflow.com/questions/55742540/… 不幸的是,我不知道如何在 Airflow 的上下文中应用此修复程序。你有什么想法吗?
    • 只要我不知道您的气流安装情况,我就不容易知道您的问题。我强烈建议您通过天文学家安装气流astronomer.io/docs/cloud/stable/develop/cli-quickstart
    • 我尝试了与 astonomer 在这篇文章中完全相同的示例:astronomer.io/docs/cloud/stable/develop/kubepodoperator-local 在您的答案中也引用了 config_file,但它没有任何区别。我可能会尝试使用 Astronomer 或在 Docker 中重新安装 Airflow。
    • 我不明白你的问题:(简而言之,天文学家你不需要安装气流,只需为你的项目创建一个目录并运行astro dev init命令生成项目结构然后在include/kube/config 中创建配置文件,最后执行命令astro dev start,其余的由astro 负责。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-02-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-14
    • 2023-03-17
    相关资源
    最近更新 更多