【问题标题】:Apache Airflow giving broken DAG error cannot import __builtin__ for speedtest.pyApache Airflow 给出损坏的 DAG 错误无法为 speedtest.py 导入 __builtin__
【发布时间】:2021-06-06 14:14:18
【问题描述】:

这是我遇到的一个奇怪的错误。在我的 Python 3.7 环境中,我使用 pip 安装了 Airflow 2、speedtest-cli 和其他一些东西,我一直在 Airflow UI 中看到这个错误弹出窗口:

Broken DAG: [/env/app/airflow/dags/my_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 156, in <module>
    import __builtin__
ModuleNotFoundError: No module named '__builtin__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 179, in <module>
    _py3_utf8_stdout = _Py3Utf8Output(sys.stdout)
  File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 166, in __init__
    buf = FileIO(f.fileno(), 'w')
AttributeError: 'StreamLogWriter' object has no attribute 'fileno'

对于健全性检查,我确实运行了以下内容,但没有发现任何问题:

~# python airflow/dags/my_dag.py 
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required

~# airflow dags list
dag_id     | filepath      | owner   | paused
===========+===============+=========+=======
my_dag | my_dag.py | rafay   | False  


~# airflow tasks list my_dag 
[2021-03-08 16:46:26,950] {dagbag.py:448} INFO - Filling up the DagBag from /env/app/airflow/dags
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
Start_backup
get_configs
get_targets
push_targets

所以没有什么不寻常的,测试每个任务也不会引起问题。在 Airflow 之外进一步独立运行 speedtest-cli 脚本也不会引发任何错误。脚本是这样的:

import speedtest

def get_upload_speed():
    """
    Calculates the upload speed of the internet in using speedtest api

    Returns:
        Returns upload speed in Mbps
    """

    try:
        s = speedtest.Speedtest()
        upload = s.upload()
    except speedtest.SpeedtestException as e:
        raise AirflowException("Failed to check network bandwidth make sure internet is available.\nException: {}".format(e))

    return round(upload / (1024**2), 2)

我什至去了speedtest.py 的确切行,正如提到的 Broken DAG 错误,第 156 行,当我放入 python 解释器时,它看起来很好并且运行良好。

try:
    import __builtin__
except ImportError:
    import builtins
    from io import TextIOWrapper, FileIO

那么,我该如何诊断呢?似乎是某种包导入问题

编辑:如果有帮助,这里是我的 my_dag.py 目录和导入结构

- airflow
  - dags
    - tasks
      - get_configs.py
      - get_taargets.py
      - push_targets.py  (speedtest is imported here)
    - my_dag.py

dag文件中任务的导入顺序如下:

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from tasks.get_configs import get_configs
from tasks.get_targets import get_targets
from tasks.push_targets import push_targets

...

【问题讨论】:

  • 这是整个错误信息吗?我对此表示怀疑——应该还有更多。看来你早早把它剪掉了。
  • @user2357112supportsMonica 添加了完整的错误,但最初声明的错误是主要错误,其他错误源自它
  • “但最初声明的那个是主要的” - 看起来可能是这样,但事实并非如此。这实际上是完全预期的行为。错误消息的那部分只是 Python 2 代码路径失败并触发了 Python 3 代码路径。之后的部分是实际上出了问题。
  • @user2357112supportsMonica 你确定这是错误:AttributeError: 'StreamLogWriter' object has no attribute 'fileno'。另外,正如我所说,python speedtest 库在同一环境中独立运行。

标签: python airflow speed-test


【解决方案1】:

Airflow StreamLogWriter(和其他与日志相关的工具)没有实现“标准”Python (I/O) 日志工具客户端所期望的 fileno 方法(由 todo comment 确认)。在 Airflow 任务中启用 faulthandler standard library 时也会出现此问题。

那么此时该怎么办呢?除了打开问题或向 Airflow 发送 PR 之外,这确实是逐案处理的。在speedtest-cli 的情况下,可能需要隔离调用fileno 的函数,并尝试“替换”它(例如分叉库,如果可以隔离和注入则更改函数,也许选择一个配置不要使用那部分代码)。

在我的特殊情况下,无法绕过代码,而 fork 是最直接的方法。

【讨论】:

  • 我最终只是放弃了 python speedtest 客户端并使用了 ookla 的 speedtest-cli,它基本上只是一个我可以从任何地方调用的二进制文件。 speedtest.net/apps/cli
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-10-16
  • 2021-07-03
  • 2022-12-11
  • 1970-01-01
相关资源
最近更新 更多