【问题标题】:Airflow 1.9 - Cannot get logs to write to s3Airflow 1.9 - 无法将日志写入 s3
【发布时间】:2018-02-23 19:10:31
【问题描述】:

我在 aws 的 kubernetes 中运行气流 1.9。我希望将日志转到 s3,因为气流容器本身的寿命不长。

我已经阅读了描述该过程的各种线程和文档,但我仍然无法使其正常工作。首先是向我证明 s3 配置和权限有效的测试。这是在我们的一个工作实例上运行的。

使用气流写入 s3 文件

airflow@airflow-worker-847c66d478-lbcn2:~$ id
uid=1000(airflow) gid=1000(airflow) groups=1000(airflow)
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
airflow@airflow-worker-847c66d478-lbcn2:~$ python
Python 3.6.4 (default, Dec 21 2017, 01:37:56)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> s3 = airflow.hooks.S3Hook('s3_logs')
/usr/local/lib/python3.6/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing S3Hook directly from <module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'> has been deprecated. Please import from '<module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
>>> s3.load_string('put this in s3 file', airflow.conf.get('core', 'remote_base_log_folder') + "/airflow-test")
[2018-02-23 18:43:58,437] {{base_hook.py:80}} INFO - Using connection to: vevo-dev-us-east-1-services-airflow

现在让我们从 s3 中检索文件并查看内容。我们可以看到这里的一切看起来都不错。

root@4f8171d4fe47:/# aws s3 cp s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test .
download: s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test to ./airflow-test
root@4f8171d4fe47:/# cat airflow-test
put this in s3 fileroot@4f8171d4fe47:/stringer#

所以看起来气流 s3 连接很好,除了气流作业不使用 s3 进行日志记录。以下是我认为有问题或遗漏某些内容的设置。

正在运行的 worker/scheduler/master 实例的环境变量是

airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep -i s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
S3_BUCKET=vevo-dev-us-east-1-services-airflow

这说明气流中存在s3_logs连接

airflow@airflow-worker-847c66d478-lbcn2:~$ airflow connections -l|grep s3
│ 's3_logs'              │ 's3'                    │ 'vevo-dev-
us-...vices-airflow' │ None   │ False          │ False                │ None                           │

我将此文件 https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py 放在我的 docker 映像中。您可以在此处查看我们的一位员工的示例

airflow@airflow-worker-847c66d478-lbcn2:~$ ls -al /usr/local/airflow/config/
total 32
drwxr-xr-x. 2 root    root    4096 Feb 23 00:39 .
drwxr-xr-x. 1 airflow airflow 4096 Feb 23 00:53 ..
-rw-r--r--. 1 root    root    4471 Feb 23 00:25 airflow_local_settings.py
-rw-r--r--. 1 root    root       0 Feb 16 21:35 __init__.py

我们已编辑文件以定义 REMOTE_BASE_LOG_FOLDER 变量。这是我们的版本和上游版本之间的差异

index 899e815..897d2fd 100644
--- a/var/tmp/file
+++ b/config/airflow_local_settings.py
@@ -35,7 +35,8 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 # Storage bucket url for remote logging
 # s3 buckets should start with "s3://"
 # gcs buckets should start with "gs://"
-REMOTE_BASE_LOG_FOLDER = ''
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'remote_base_log_folder')
+

 DEFAULT_LOGGING_CONFIG = {
     'version': 1,

您可以在此处看到我们的一名工作人员的设置是正确的。

>>> import airflow
>>> airflow.conf.get('core', 'remote_base_log_folder')
's3://vevo-dev-us-east-1-services-airflow/logs/'

基于 REMOTE_BASE_LOG_FOLDER 以 's3' 开头并且 REMOTE_LOGGING 为 True 的事实

>>> airflow.conf.get('core', 'remote_logging')
'True'

我希望这个块 https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123 评估为 true 并将日志转到 s3。

请任何在 1.9 上使用 s3 日志记录的人指出我缺少什么?我想向上游项目提交 PR 以更新文档,因为这似乎是一个很常见的问题,并且据我所知,上游文档无效或以某种方式经常被误解。

谢谢! G.

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    是的,我也很难仅根据文档进行设置。我必须检查气流的代码才能弄清楚。有很多事情你不能做。

    需要检查的一些事项:
    1. 确保您拥有 log_config.py 文件并且它位于正确的目录中:./config/log_config.py。还要确保您没有忘记该目录中的 __init__.py 文件。
    2. 确保您定义了 s3.task 处理程序并将其格式化程序设置为气流.task
    3. 确保将气流.task 和气流.task_runner 处理程序设置为 s3.task

    这是一个适合我的 log_config.py 文件:

    # -*- coding: utf-8 -*-
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import os
    
    from airflow import configuration as conf
    
    # TO DO: Logging format and level should be configured
    # in this file instead of from airflow.cfg. Currently
    # there are other log format and level configurations in
    # settings.py and cli.py. Please see AIRFLOW-1455.
    
    LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
    LOG_FORMAT = conf.get('core', 'log_format')
    
    BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
    PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
    
    FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
    PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
    
    S3_LOG_FOLDER = 's3://your_path_to_airflow_logs'
    
    LOGGING_CONFIG = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'airflow.task': {
                'format': LOG_FORMAT,
            },
            'airflow.processor': {
                'format': LOG_FORMAT,
            },
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'airflow.task',
                'stream': 'ext://sys.stdout'
            },
            'file.task': {
                'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
                'formatter': 'airflow.task',
                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                'filename_template': FILENAME_TEMPLATE,
            },
            'file.processor': {
                'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
                'formatter': 'airflow.processor',
                'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
                'filename_template': PROCESSOR_FILENAME_TEMPLATE,
            },
            # When using s3 or gcs, provide a customized LOGGING_CONFIG
            # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
            # for details
            's3.task': {
                'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
                'formatter': 'airflow.task',
                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                's3_log_folder': S3_LOG_FOLDER,
                'filename_template': FILENAME_TEMPLATE,
            },
            # 'gcs.task': {
            #     'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
            #     'formatter': 'airflow.task',
            #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            #     'gcs_log_folder': GCS_LOG_FOLDER,
            #     'filename_template': FILENAME_TEMPLATE,
            # },
        },
        'loggers': {
            '': {
                'handlers': ['console'],
                'level': LOG_LEVEL
            },
            'airflow': {
                'handlers': ['console'],
                'level': LOG_LEVEL,
                'propagate': False,
            },
            'airflow.processor': {
                'handlers': ['file.processor'],
                'level': LOG_LEVEL,
                'propagate': True,
            },
            'airflow.task': {
                'handlers': ['s3.task'],
                'level': LOG_LEVEL,
                'propagate': False,
            },
            'airflow.task_runner': {
                'handlers': ['s3.task'],
                'level': LOG_LEVEL,
                'propagate': True,
            },
        }
    }
    

    【讨论】:

    • 能否请包含您对 s3.task 处理程序的定义。我不确定这是什么或应该去哪里。
    • 嗨,./config/ 应该去哪里?我把它放在$AIRFLOW_HOME,它仍然写着ImportError: Unable to load custom logging from config.log_config.LOGGING_CONFIG due to No module named 'config'。我什至尝试将$AIRFLOW_HOME 添加到$PYTHONPATH
    【解决方案2】:

    当使用 official helm chart 部署到 k8 时,我还必须将远程日志记录配置添加到 worker pod。 所以这还不够:

      AIRFLOW__CORE__REMOTE_LOGGING: True
      AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
      AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'
    

    我还必须将这些变量传递给工人

      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOGGING: True
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'
    

    【讨论】:

      猜你喜欢
      • 2020-05-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-28
      • 2020-01-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多