【问题标题】:Removing Airflow task logs删除 Airflow 任务日志
【发布时间】:2017-04-21 17:29:12
【问题描述】:

我正在运行 5 个 DAG,它们在一个月的时间里在 base_log_folder 中生成了总共大约 6GB 的日志数据。我刚刚添加了一个remote_base_log_folder,但它似乎不排除登录到base_log_folder

是否有自动删除旧日志文件、旋转它们或强制气流仅在远程存储中不登录磁盘(base_log_folder)?

【问题讨论】:

  • 两年后我会很好奇你的解决方案是什么......遇到这个问题

标签: airflow


【解决方案1】:

请参考https://github.com/teamclairvoyant/airflow-maintenance-dags

这个插件有 DAG,可以杀死暂停的任务和日志清理。 您可以抓住这些概念,并提出一个可以根据您的要求进行清理的新 DAG。

【讨论】:

【解决方案2】:

我们通过实现我们自己的FileTaskHandler,然后在airflow.cfg 中指向它来删除任务日志。因此,我们覆盖了默认的 LogHandler,只保留 N 个任务日志,而不调度额外的 DAG。

我们正在使用Airflow==1.10.1

[core]
logging_config_class = log_config.LOGGING_CONFIG

log_config.LOGGING_CONFIG

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}'
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'

LOGGING_CONFIG = {
    'formatters': {},
    'handlers': {
        '...': {},
        'task': {
            'class': 'file_task_handler.FileTaskRotationHandler',
            'formatter': 'airflow.job',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
            'folder_task_template': FOLDER_TASK_TEMPLATE,
            'retention': 20
        },
        '...': {}
    },
    'loggers': {
        'airflow.task': {
            'handlers': ['task'],
            'level': JOB_LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        '...': {}
    }
}

file_task_handler.FileTaskRotationHandler

import os
import shutil

from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler


class FileTaskRotationHandler(FileTaskHandler):

    def __init__(self, base_log_folder, filename_template, folder_task_template, retention):
        """
        :param base_log_folder: Base log folder to place logs.
        :param filename_template: template filename string.
        :param folder_task_template: template folder task path.
        :param retention: Number of folder logs to keep
        """
        super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template)
        self.retention = retention
        self.folder_task_template, self.folder_task_template_jinja_template = \
            parse_template_string(folder_task_template)

    @staticmethod
    def _get_directories(path='.'):
        return next(os.walk(path))[1]

    def _render_folder_task_path(self, ti):
        if self.folder_task_template_jinja_template:
            jinja_context = ti.get_template_context()
            return self.folder_task_template_jinja_template.render(**jinja_context)

        return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id)

    def _init_file(self, ti):
        relative_path = self._render_folder_task_path(ti)
        folder_task_path = os.path.join(self.local_base, relative_path)
        subfolders = self._get_directories(folder_task_path)
        to_remove = set(subfolders) - set(subfolders[-self.retention:])

        for dir_to_remove in to_remove:
            full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove)
            print('Removing', full_dir_to_remove)
            shutil.rmtree(full_dir_to_remove)

        return FileTaskHandler._init_file(self, ti)

【讨论】:

  • 这仅有助于删除本地日志文件,但不会进行远程日志记录。
  • 这是当今最全面的解决方案。唯一的是,它不能保证只会删除旧文件夹。由于 os.walk() 不能确保顺序。 stackoverflow.com/questions/18282370/…
  • log_config 在哪里?
【解决方案3】:

Airflow维护者不认为截断日志是airflow核心逻辑的一部分,见this,然后在本期,维护者建议更改LOG_LEVEL避免日志数据过多。

this PR 中,我们可以了解如何在airflow.cfg 中更改日志级别。

祝你好运。

【讨论】:

  • 那是因为气流维护者很懒
【解决方案4】:

我知道这听起来很野蛮,但你试过将base_log_folder 指向/dev/null 吗?我使用 Airflow 作为容器的一部分,所以我也不关心文件,只要将记录器管道连接到 STDOUT 即可。

但不确定它与 S3 的性能如何。

【讨论】:

    【解决方案5】:

    对于你的具体问题,我有一些建议。 对于那些,你总是需要一个专门的日志配置,如这个答案中所述:https://stackoverflow.com/a/54195537/2668430

    • 自动删除旧日志文件并轮换它们

    我还没有使用 Python 标准库中的 TimedRotatingFileHandler 的任何实际经验,但您可以尝试一下: https://docs.python.org/3/library/logging.handlers.html#timedrotatingfilehandler

    它不仅可以根据时间间隔轮换文件,而且如果您指定 backupCount 参数,它甚至会删除您的旧日志文件:

    如果backupCount 不为零,则最多保留backupCount 文件,如果发生翻转时会创建更多文件,则删除最旧的文件。删除逻辑使用间隔来确定要删除哪些文件,因此更改间隔可能会留下旧文件。

    这听起来很像是解决您第一个问题的最佳解决方案。


    • 强制气流不登录磁盘(base_log_folder),而只登录远程存储?

    在这种情况下,您应该指定日志配置,使您没有有任何写入文件的日志处理程序,即删除所有FileHandlers

    相反,尝试找到将输出直接发送到远程地址的日志处理程序。 例如。 CMRESHandler 直接记录到 ElasticSearch 但在日志调用中需要一些额外的字段。 或者,编写您自己的处理程序类并让它从 Python 标准库的HTTPHandler 继承。


    最后的建议是将TimedRotatingFileHandler 和设置 ElasticSearch 与 FileBeat 结合起来,这样您就可以将日志存储在 ElasticSearch 中(即远程),但您不会存储大量日志您的 Airflow 磁盘,因为它们将被您的 TimedRotatingFileHandlerbackupCount 保留策略删除。

    【讨论】:

      【解决方案6】:

      通常 apache 气流由于 3 个原因抢占磁盘空间

      • 1。气流调度程序日志文件 2.mysql binaly日志【主要】 3. xcom 表记录。

      为了使其定期清理,我设置了一个每日运行的 dag,清理二进制日志并截断 xcom 表以释放磁盘空间 您可能还需要安装 [pip install mysql-connector-python]。 为了清理调度程序日志文件,我会在一周内手动删除它们两次,以避免由于某些原因需要删除日志的风险。

      我通过 [sudo rm -rd airflow/logs/] 命令清理日志文件。

      下面是我的python代码供参考

      '
      """Example DAG demonstrating the usage of the PythonOperator."""
      
      from airflow import DAG
      from airflow.operators.python import PythonOperator
      from datetime import datetime, timedelta
      from airflow.utils.dates import days_ago
      from airflow.operators.bash import BashOperator
      from airflow.providers.postgres.operators.postgres import PostgresOperator
      
      
      args = {
          'owner': 'airflow',
          'email_on_failure':True,
          'retries': 1,
          'email':['Your Email Id'],
          'retry_delay': timedelta(minutes=5)
      }
      
      
      dag = DAG(
          dag_id='airflow_logs_cleanup',
          default_args=args,
          schedule_interval='@daily',
          start_date=days_ago(0),
          catchup=False,
          max_active_runs=1,
          tags=['airflow_maintenance'],
      )
      
      def truncate_table():
          import mysql.connector
      
          connection = mysql.connector.connect(host='localhost',
                                               database='db_name',
                                               user='username',
                                               password='your password',
                                               auth_plugin='mysql_native_password')
          cursor = connection.cursor()
          sql_select_query = """TRUNCATE TABLE xcom"""
          cursor.execute(sql_select_query)
          connection.commit()
          connection.close()
          print("XCOM Table truncated successfully")
      
      
      def delete_binary_logs():
          import mysql.connector
          from datetime import datetime
          date = datetime.today().strftime('%Y-%m-%d')
          connection = mysql.connector.connect(host='localhost',
                                               database='db_name',
                                               user='username',
                                               password='your_password',
                                               auth_plugin='mysql_native_password')
          cursor = connection.cursor()
          query = 'PURGE BINARY LOGS BEFORE ' + "'" + str(date) + "'"
      
          sql_select_query = query
          cursor.execute(sql_select_query)
          connection.commit()
          connection.close()
          print("Binary logs deleted  successfully")
      
      t1 = PythonOperator(
      
          task_id='truncate_table',
          python_callable=truncate_table, dag=dag
      
      )
      
      t2 = PythonOperator(
      
          task_id='delete_binary_logs',
          python_callable=delete_binary_logs, dag=dag
      )
      t2 << t1
      

      '

      【讨论】:

        【解决方案7】:

        我很惊讶,但它对我有用。如下更新您的配置:

        base_log_folder=""
        

        在minio和s3中测试。

        【讨论】:

          【解决方案8】:

          我们的解决方案看起来很像 Franzi 的:

          在 Airflow 2.0.1 (py3.8) 上运行

          覆盖默认日志记录配置

          由于我们使用 helm chart 进行气流部署,因此在那里推送 env 是最简单的,但也可以在 airflow.cfg 中或在 dockerfile 中使用 ENV。

          # Set custom logging configuration to enable log rotation for task logging
          AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS: "airflow_plugins.settings.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
          

          然后我们将日志配置与自定义日志处理程序一起添加到我们在 docker 映像中构建和安装的 python 模块中。如此处所述:https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html

          日志配置sn-p

          这只是气流代码库的默认副本,但随后任务记录器会获得不同的处理程序。

          DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
          'version': 1,
          'disable_existing_loggers': False,
          'formatters': {
              'airflow': {'format': LOG_FORMAT},
              'airflow_coloured': {
                  'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
                  'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter',
              },
          },
          'handlers': {
              'console': {
                  'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
                  'formatter': 'airflow_coloured',
                  'stream': 'sys.stdout',
              },
              'task': {
                  'class': 'airflow_plugins.log.rotating_file_task_handler.RotatingFileTaskHandler',
                  'formatter': 'airflow',
                  'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                  'filename_template': FILENAME_TEMPLATE,
                  'maxBytes': 10485760,  # 10MB
                  'backupCount': 6,
              },
              ...
          

          RotatingFileTaskHandler

          最后是自定义处理程序,它只是 logging.handlers.RotatingFileHandler 和 FileTaskHandler 的合并。

          #
          # Licensed to the Apache Software Foundation (ASF) under one
          # or more contributor license agreements.  See the NOTICE file
          # distributed with this work for additional information
          # regarding copyright ownership.  The ASF licenses this file
          # to you under the Apache License, Version 2.0 (the
          # "License"); you may not use this file except in compliance
          # with the License.  You may obtain a copy of the License at
          #
          #   http://www.apache.org/licenses/LICENSE-2.0
          #
          # Unless required by applicable law or agreed to in writing,
          # software distributed under the License is distributed on an
          # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
          # KIND, either express or implied.  See the License for the
          # specific language governing permissions and limitations
          # under the License.
          """File logging handler for tasks."""
          import logging
          import os
          from pathlib import Path
          from typing import TYPE_CHECKING, Optional
          
          import requests
          
          from airflow.configuration import AirflowConfigException, conf
          from airflow.utils.helpers import parse_template_string
          
          if TYPE_CHECKING:
              from airflow.models import TaskInstance
          
          
          class RotatingFileTaskHandler(logging.Handler):
              """
              FileTaskHandler is a python log handler that handles and reads
              task instance logs. It creates and delegates log handling
              to `logging.FileHandler` after receiving task instance context.
              It reads logs from task instance's host machine.
          
              :param base_log_folder: Base log folder to place logs.
              :param filename_template: template filename string
              """
          
              def __init__(self, base_log_folder: str, filename_template: str, maxBytes=0, backupCount=0):
                  self.max_bytes = maxBytes
                  self.backup_count = backupCount
                  super().__init__()
                  self.handler = None  # type: Optional[logging.FileHandler]
                  self.local_base = base_log_folder
                  self.filename_template, self.filename_jinja_template = parse_template_string(filename_template)
          
              def set_context(self, ti: "TaskInstance"):
                  """
                  Provide task_instance context to airflow task handler.
          
                  :param ti: task instance object
                  """
                  local_loc = self._init_file(ti)
                  self.handler = logging.handlers.RotatingFileHandler(
                      filename=local_loc,
                      mode='a',
                      maxBytes=self.max_bytes,
                      backupCount=self.backup_count,
                      encoding='utf-8',
                      delay=False,
                  )
                  if self.formatter:
                      self.handler.setFormatter(self.formatter)
                  self.handler.setLevel(self.level)
          
              def emit(self, record):
                  if self.handler:
                      self.handler.emit(record)
          
              def flush(self):
                  if self.handler:
                      self.handler.flush()
          
              def close(self):
                  if self.handler:
                      self.handler.close()
          
              def _render_filename(self, ti, try_number):
                  if self.filename_jinja_template:
                      if hasattr(ti, 'task'):
                          jinja_context = ti.get_template_context()
                          jinja_context['try_number'] = try_number
                      else:
                          jinja_context = {
                              'ti': ti,
                              'ts': ti.execution_date.isoformat(),
                              'try_number': try_number,
                          }
                      return self.filename_jinja_template.render(**jinja_context)
          
                  return self.filename_template.format(
                      dag_id=ti.dag_id,
                      task_id=ti.task_id,
                      execution_date=ti.execution_date.isoformat(),
                      try_number=try_number,
                  )
          
              def _read_grouped_logs(self):
                  return False
          
              def _read(self, ti, try_number, metadata=None):  # pylint: disable=unused-argument
                  """
                  Template method that contains custom logic of reading
                  logs given the try_number.
          
                  :param ti: task instance record
                  :param try_number: current try_number to read log from
                  :param metadata: log metadata,
                                   can be used for steaming log reading and auto-tailing.
                  :return: log message as a string and metadata.
                  """
                  # Task instance here might be different from task instance when
                  # initializing the handler. Thus explicitly getting log location
                  # is needed to get correct log path.
                  log_relative_path = self._render_filename(ti, try_number)
                  location = os.path.join(self.local_base, log_relative_path)
          
                  log = ""
          
                  if os.path.exists(location):
                      try:
                          with open(location) as file:
                              log += f"*** Reading local file: {location}\n"
                              log += "".join(file.readlines())
                      except Exception as e:  # pylint: disable=broad-except
                          log = f"*** Failed to load local log file: {location}\n"
                          log += "*** {}\n".format(str(e))
                  elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
                      try:
                          from airflow.kubernetes.kube_client import get_kube_client
          
                          kube_client = get_kube_client()
          
                          if len(ti.hostname) >= 63:
                              # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
                              # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
                              # on any label of a FQDN.
                              pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
                              matches = [
                                  pod.metadata.name
                                  for pod in pod_list.items
                                  if pod.metadata.name.startswith(ti.hostname)
                              ]
                              if len(matches) == 1:
                                  if len(matches[0]) > len(ti.hostname):
                                      ti.hostname = matches[0]
          
                          log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'.format(
                              ti.hostname
                          )
          
                          res = kube_client.read_namespaced_pod_log(
                              name=ti.hostname,
                              namespace=conf.get('kubernetes', 'namespace'),
                              container='base',
                              follow=False,
                              tail_lines=100,
                              _preload_content=False,
                          )
          
                          for line in res:
                              log += line.decode()
          
                      except Exception as f:  # pylint: disable=broad-except
                          log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format(ti.hostname, str(f))
                  else:
                      url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format(
                          ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
                      )
                      log += f"*** Log file does not exist: {location}\n"
                      log += f"*** Fetching from: {url}\n"
                      try:
                          timeout = None  # No timeout
                          try:
                              timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
                          except (AirflowConfigException, ValueError):
                              pass
          
                          response = requests.get(url, timeout=timeout)
                          response.encoding = "utf-8"
          
                          # Check if the resource was properly fetched
                          response.raise_for_status()
          
                          log += '\n' + response.text
                      except Exception as e:  # pylint: disable=broad-except
                          log += "*** Failed to fetch log file from worker. {}\n".format(str(e))
          
                  return log, {'end_of_log': True}
          
              def read(self, task_instance, try_number=None, metadata=None):
                  """
                  Read logs of given task instance from local machine.
          
                  :param task_instance: task instance object
                  :param try_number: task instance try_number to read logs from. If None
                                     it returns all logs separated by try_number
                  :param metadata: log metadata,
                                   can be used for steaming log reading and auto-tailing.
                  :return: a list of listed tuples which order log string by host
                  """
                  # Task instance increments its try number when it starts to run.
                  # So the log for a particular task try will only show up when
                  # try number gets incremented in DB, i.e logs produced the time
                  # after cli run and before try_number + 1 in DB will not be displayed.
          
                  if try_number is None:
                      next_try = task_instance.next_try_number
                      try_numbers = list(range(1, next_try))
                  elif try_number < 1:
                      logs = [
                          [('default_host', f'Error fetching the logs. Try number {try_number} is invalid.')],
                      ]
                      return logs, [{'end_of_log': True}]
                  else:
                      try_numbers = [try_number]
          
                  logs = [''] * len(try_numbers)
                  metadata_array = [{}] * len(try_numbers)
                  for i, try_number_element in enumerate(try_numbers):
                      log, metadata = self._read(task_instance, try_number_element, metadata)
                      # es_task_handler return logs grouped by host. wrap other handler returning log string
                      # with default/ empty host so that UI can render the response in the same way
                      logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
                      metadata_array[i] = metadata
          
                  return logs, metadata_array
          
              def _init_file(self, ti):
                  """
                  Create log directory and give it correct permissions.
          
                  :param ti: task instance object
                  :return: relative log path of the given task instance
                  """
                  # To handle log writing when tasks are impersonated, the log files need to
                  # be writable by the user that runs the Airflow command and the user
                  # that is impersonated. This is mainly to handle corner cases with the
                  # SubDagOperator. When the SubDagOperator is run, all of the operators
                  # run under the impersonated user and create appropriate log files
                  # as the impersonated user. However, if the user manually runs tasks
                  # of the SubDagOperator through the UI, then the log files are created
                  # by the user that runs the Airflow command. For example, the Airflow
                  # run command may be run by the `airflow_sudoable` user, but the Airflow
                  # tasks may be run by the `airflow` user. If the log files are not
                  # writable by both users, then it's possible that re-running a task
                  # via the UI (or vice versa) results in a permission error as the task
                  # tries to write to a log file created by the other user.
                  relative_path = self._render_filename(ti, ti.try_number)
                  full_path = os.path.join(self.local_base, relative_path)
                  directory = os.path.dirname(full_path)
                  # Create the log file and give it group writable permissions
                  # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
                  # operator is not compatible with impersonation (e.g. if a Celery executor is used
                  # for a SubDag operator and the SubDag operator has a different owner than the
                  # parent DAG)
                  Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True)
          
                  if not os.path.exists(full_path):
                      open(full_path, "a").close()
                      # TODO: Investigate using 444 instead of 666.
                      os.chmod(full_path, 0o666)
          
                  return full_path
          

          也许是最后一点;现在,气流 UI 中指向日志记录的链接将只打开最新的日志文件,而不是只能通过 SSH 或任何其他接口访问以访问气流日志路径的旧旋转文件。

          【讨论】:

            【解决方案9】:

            我认为没有轮换机制,但您可以将它们存储在 S3 或谷歌云存储中,如下所述:https://airflow.incubator.apache.org/configuration.html#logs

            【讨论】:

            • 谢谢,我正在使用远程日志记录选项,remote_base_log_folder 将日志文件存储在 GCS 上。这会将日志添加到 GCS,但不会在本地删除它们。我想我想知道其他人是如何处理大量日志文件在生产中填满他们的磁盘的。
            • @jompa 你找到解决方案了吗?我也在 S3 中存储日志,但如何删除本地日志?
            猜你喜欢
            • 1970-01-01
            • 2017-09-28
            • 1970-01-01
            • 2017-02-28
            • 2021-02-22
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多