【问题标题】:Airflow DockerOperator cannot find some images but can find othersAirflow DockerOperator 找不到某些图像,但可以找到其他图像
【发布时间】:2020-08-11 17:48:55
【问题描述】:

尝试在 Airflow 中使用 Docker 运算符时出现以下错误。我看不到气流设置(它由另一个团队在我无法访问的机器上运行,并且负责的团队没有响应)。我从自己编写的 docker 文件创建了 docker 映像。名称 cmprod 指的是 docker 映像。

ImageNotFound: 404 Client Error: Not Found ("pull access denied for cmprod, repository does not exist or may require 'docker login': denied: requested access to the resource is denied")

我不熟悉 docker login 的使用,我不确定它是否适用于这种情况,因为我能够运行一些图像而不是其他图像。 起初我虽然输入了错误的 docker 图像的名称,但我检查并仔细检查了。下面是docker images 的输出。我能够通过气流成功运行图像 condatest。

REPOSITORY               TAG                 IMAGE ID            CREATED             SIZE
cm_prod                  latest              08f408557eb7        15 hours ago        2.12GB
cmprod                   latest              08f408557eb7        15 hours ago        2.12GB
<none>                   <none>              4af8c991ea19        15 hours ago        730MB
<none>                   <none>              9da4759a3316        15 hours ago        64.2MB
condatest                latest              e24563f9bb48        5 days ago          2.12GB

我认为我可能错误地使用了 docker 操作符,但我能够运行其他一些图像。我认为可能存在气流配置问题,其中不允许某些操作系统或不允许以某些权限运行,但我无法找到任何有关这是否可行的文档。

我的测试没有显示上述任何因素来确定 100% 是否可以使用 docker 操作符通过气流找到 docker 图像。这个问题似乎不适合反复试验。任何关于可能发生的事情的建议将不胜感激。

我能够在我的浏览器中看到气流 UI 并触发 dags,并且有一个共享目录,我可以在其中转储我的 dag 规范脚本。气流版本:1.10.3。

docker的版本信息如下docker version:

Client: Docker Engine - Community
 Version:           19.03.6
 API version:       1.40
 Go version:        go1.12.16
 Git commit:        369ce74a3c
 Built:             Thu Feb 13 01:29:29 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.6
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.16
  Git commit:       369ce74a3c
  Built:            Thu Feb 13 01:28:07 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.10
  GitCommit:        b34a5c8af56e510852c35414db4c1f4fa6172339
 runc:
  Version:          1.0.0-rc8+dev
  GitCommit:        3e425f80a8c931f88e6d94a8c831b9d5aa481657
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

编辑: 已请求气流 DAG 代码。我不愿发布整个内容,因为我从一个离开的团队成员那里继承了一些代码,我觉得 dag 中的一些代码最好作为单独的脚本实现。以下是最相关的代码块。让我知道是否缺少任何东西。为了清楚起见,我省略了这些块之间的一部分,但如果似乎没有任何效果,可以包括在内。

代码块 1:导入依赖项

from functools import reduce
import os, os.path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.helpers import chain

代码块 2:DAG 和 OPERATOR 实例化

# create SQL operators
def create_SQL_operator(taskfile, dag):
    """
    Creates a MsSQL operator for a given DAG.
    """
    op = MsSqlOperator(
        task_id=taskfile,
        sql=readSQL(os.path.join(ProjDir, taskfile)),
        mssql_conn_id='clarity',
        autocommit=True,
        database='clarity',
        dag=dag
        )
    return op

# Airflow arguments
default_args = {
    'owner': 'airflow',
    'description': 'Parallel SQL DAG',
    'depend_on_past': False,
    'start_date': datetime(2020, 1, 1),
    'email': ['*PERSONTOEMAIL*'],
    'email_on_failure': False,
    'email_on_retry': True
}

# DAG definition
DAG = DAG(ProjName + '_and_infer',
          description='Running parallel SQLs for project: {} and inference on the data'.format(ProjName),
          default_args=default_args,
          schedule_interval=CronTime,   # '0 */2 * * *',  #every 2 hours
          concurrency=50,               # setup to allow 50 concurrent parallel tasks
          catchup=False)
t_predict = DockerOperator(
        task_id='dockerPredict',
        image='cmprod',
        api_version='auto',
        auto_remove=True,
        volumes=['*ABSOLUTEPATHTOMOUNT*:/ds-cm'],
        command='bash inference.sh ',
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        dag=DAG)

# Create SQL task operators in Airflow global space
ops = []
ops = [(order, create_SQL_operator(taskfile, DAG)) for order, taskfile in sql_rank]
ops.sort(key=lambda tup: tup[0])

# create cluster ops list
from itertools import groupby
from operator import itemgetter
opsList = []
opsList = [[j for i, j in grouper] for order, grouper in groupby(ops, key=itemgetter(0))]

# flatten list with only 1 element: Airflow chain() cannot accept list of lists!!
chainList = []
chainList = [reduce(plus, list) if len(list) == 1 else list for list in opsList]
chainList.append(t_predict)

# create final DAG graph
exec(r' >> '.join([r'chainList['+str(i)+r']' for i in range(len(chainList))]))

更新 由于我最初发布了这个问题,所以我将 condatest 图像替换为上面的代码并设法以不同的方式出错:挂载目录中缺少 shell 脚本。

当我复制丢失的文件并再次运行时,airflow 无法再找到 condatest 图像。查了一下,新复制的脚本没有执行权限,添加了权限。 Airflow 仍然找不到以前工作的 docker 容器。

我删除了 shell 脚本,airflow 可以再次找到容器。这是否意味着问题与 Linux 权限有关?我不清楚安装驱动器的内容如何影响气流检测容器的能力。此外,我知道过去我能够使用由气流中的 dockerobject 启动的 docker 容器运行相同的脚本。

【问题讨论】:

  • 你能粘贴你的 DAG 代码吗?我需要检查代码以进一步调试。谢谢。
  • @hopeIsTheonlyWeapon 我添加了大部分代码。还有其他一些与管理 SQL 代码有关的工作,我遗漏了这些工作以尝试使 dag 代码更清晰。如果您希望我发布整个内容,请告诉我。
  • 你能从盒子里运行 docker 容器吗(气流运行的实例成为运行气流的用户?
  • 不幸的是,我没有该机器的登录凭据。我能做的最好的事情就是运行一个 dag,它被剥离为 Docker 操作员。我能够让更新中讨论的 condatest 图像以这种方式工作。你有什么建议我可以传递给拥有那台机器的团队吗?我将不得不等待几天让他们解决这个问题。
  • 进一步调查气流正在我正在开发的同一台机器上运行,我只是没有权限从命令行运行气流或访问存储文件的目录。所以我可以很好地运行容器。 Airflow 也以 root 身份执行。

标签: image docker dockerfile operators airflow


【解决方案1】:

将气流升级到气流2 后,日志提供了一些附加信息。 Airflow 已配置为在多台服务器上运行,并且已在每台服务器上设置了 docker,但未使用镜像注册表。似乎当作业调度程序尝试在我构建 docker 映像的服务器以外的服务器上执行 dag 时,该映像不可用。看来我之前找到的解决方法恰逢我的工作安排在哪个服务器上的幸运抽奖。

为解决此问题,我们已将调度程序配置为仅使用一台服务器。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-08-17
    • 1970-01-01
    • 2012-07-28
    • 1970-01-01
    • 2020-06-29
    • 2010-09-20
    • 1970-01-01
    • 2014-08-19
    相关资源
    最近更新 更多