【问题标题】:Airflow won't write logs to s3Airflow 不会将日志写入 s3
【发布时间】:2018-05-07 21:41:17
【问题描述】:

我尝试了不同的方法来配置 Airflow 1.9 以将日志写入 s3,但它只是忽略了它。我发现很多人这样做后在阅读日志时遇到问题,但是我的问题是日志仍然是本地的。我可以毫无问题地阅读它们,但它们不在指定的 s3 存储桶中。

我尝试的是首先写入airflow.cfg文件

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

然后我尝试设置环境变量

AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

但是它被忽略并且日志文件保持在本地。

我从一个容器运行气流,我根据我的情况调整了 https://github.com/puckel/docker-airflow,但它不会将日志写入 s3。我使用 aws 连接写入 dags 中的存储桶,这可以工作,但无论我是在 EC2 上还是在我的机器上本地运行,日志都保持在本地。

【问题讨论】:

    标签: amazon-s3 airflow


    【解决方案1】:

    我终于找到了答案 https://stackoverflow.com/a/48969421/3808066 这是大部分工作,然后我不得不再增加一步。我在这里复制了这个答案,并按照我的方式对其进行了一些调整:

    需要检查的一些事项:

    1. 确保您拥有log_config.py 文件,并且它位于正确的目录中:./config/log_config.py
    2. 确保您没有忘记该目录中的 __init__.py 文件。
    3. 确保您定义了s3.task 处理程序并将其格式化程序设置为airflow.task
    4. 确保将气流.task 和气流.task_runner 处理程序设置为 s3.task
    5. airflow.cfg 中设置task_log_reader = s3.task
    6. S3_LOG_FOLDER 传递给log_config。我使用一个变量来做到这一点,并在下面的log_config.py 中检索它。

    这是一个有效的 log_config.py:

    import os
    
    from airflow import configuration as conf
    
    
    LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
    LOG_FORMAT = conf.get('core', 'log_format')
    
    BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
    PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
    
    FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
    PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
    
    S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')
    
    LOGGING_CONFIG = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'airflow.task': {
                'format': LOG_FORMAT,
            },
            'airflow.processor': {
                'format': LOG_FORMAT,
            },
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'airflow.task',
                'stream': 'ext://sys.stdout'
            },
            'file.task': {
                'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
                'formatter': 'airflow.task',
                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                'filename_template': FILENAME_TEMPLATE,
            },
            'file.processor': {
                'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
                'formatter': 'airflow.processor',
                'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
                'filename_template': PROCESSOR_FILENAME_TEMPLATE,
            },
           's3.task': {
                'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
                'formatter': 'airflow.task',
                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                's3_log_folder': S3_LOG_FOLDER,
                'filename_template': FILENAME_TEMPLATE,
            },
        },
        'loggers': {
            '': {
                'handlers': ['console'],
                'level': LOG_LEVEL
            },
            'airflow': {
                'handlers': ['console'],
                'level': LOG_LEVEL,
                'propagate': False,
            },
            'airflow.processor': {
                'handlers': ['file.processor'],
                'level': LOG_LEVEL,
                'propagate': True,
            },
            'airflow.task': {
                'handlers': ['s3.task'],
                'level': LOG_LEVEL,
                'propagate': False,
            },
            'airflow.task_runner': {
                'handlers': ['s3.task'],
                'level': LOG_LEVEL,
                'propagate': True,
            },
        }
    }
    

    请注意,这种方式 S3_LOG_FOLDER 可以在 airflow.cfg 中指定或作为环境变量 AIRFLOW__CORE__S3_LOG_FOLDER 指定。

    【讨论】:

      【解决方案2】:

      导致这种行为的另一件事(Airflow 1.10):

      如果您查看airflow.utils.log.s3_task_handler.S3TaskHandler,您会注意到在某些情况下,日志静默不会写入 S3:

      1) 记录器实例已经是 close()d(不确定实际情况如何)
      2) 本地磁盘上不存在日志文件(这就是我到这一步的原因)

      您还会注意到记录器在多处理/多线程环境中运行,并且 Airflow S3TaskHandlerFileTaskHandler 对文件系统做了一些非常禁止的事情。 如果满足有关磁盘上日志文件的假设,则不会写入 S3 日志文件,并且不会记录或抛出有关此事件的任何内容。 如果您在日志记录中有明确的明确需求,这可能是一个不错的选择实现你自己的loggingHandlers(参见 python logging 文档)并禁用所有 Airflow 日志处理程序(参见 Airflow UPDATING.md)的想法。

      【讨论】:

        【解决方案3】:

        可能导致此行为的另一件事 - 可能未安装 botocore。 确保在安装气流时包含 s3 包pip install apache-airflow[s3]

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2020-05-28
          • 1970-01-01
          • 2020-01-04
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-02-03
          • 2015-11-06
          相关资源
          最近更新 更多