【问题标题】:logging with CloudLoggingHandler from multiple processes从多个进程使用 CloudLoggingHandler 进行日志记录
【发布时间】:2016-10-24 14:59:31
【问题描述】:

当日志由多个进程生成时,哪种是收集日志并将其发送到谷歌云日志的首选方式?

这是我基于 CloudLoggingHandler 的提案,您愿意批评它吗?

import google
from multiprocessing import Process
from logging import getLogger

class Worker(Process):
    def __init__(self):
        super(Worker, self).__init__()

    def __setup_logger(self):
        handler = CloudLoggingHandler(google.cloud.logging.Client(), name='log-name')                           
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)
        google.cloud.logging.handlers.setup_logging(handler)

    def run(self):
        self.__setup_logger()
        for i in range(10):
            logging.warning("i=%d", i)

if __name__ == "__main__":
    for _ in range(2):
        w = Worker()
        w.start()

我阅读了有关基于队列的日志处理程序here,但 CloudLoggingHandler 在隔离线程中使用批量提交,因此基于队列的处理程序将是矫枉过正。我说的对吗?

Sources 表示 CloudLoggingHandler 是线程安全的,因此所有进程共享一个 CloudLoggingHandler 实例可能就足够了。它会起作用吗?如果是这样是不是太苛刻了?


编辑下方回答@thomas-schultz。

我坚持我的提议,主要是因为我在制作原型,它“开箱即用”,而且我没有检查性能问题。我正在重新考虑这个选择。

确实,据我了解,CloudLoggingHandlerBackgroundThreadTransport 会阻塞主线程,直到日志发送到日志记录端点。几乎每个日志行都会发生这种情况。实际上,只要有一条日志记录 (cf source),就会发送批次。

在我的开发环境中,当多个进程同时登录时,一个进程会等待长达 1 秒的时间来发送日志。我想这主要是网络成本,它会从谷歌数据中心缩小到“不那么多”。

我正在考虑定义一个StreamHandler,它将所有日志记录推送到Queue。这个队列将被Process 读取,该Process 负责将日志发送到日志端点。如果相关,此过程可能依赖 CloudLoggingHandler 来执行此操作。

这有意义吗?

【问题讨论】:

    标签: logging google-cloud-python


    【解决方案1】:

    我认为这可能是矫枉过正,除非您遇到连接问题或某种需要排队的情况。

    在这种情况下,您可能会使用CloudLoggingHandler 的相同实例,但这样做可能会出现一些性能瓶颈。我不完全确定。

    这里有更多关于与 Python 的标准库记录器集成的信息。 https://googlecloudplatform.github.io/google-cloud-python/stable/logging-usage.html#integration-with-python-logging-module

    我很好奇你是否得到了不同的答案?

    【讨论】:

      【解决方案2】:

      这是我计划从多个进程登录到 Google Cloud Logging 的方式。此解决方案仅使用 python 3 内置日志处理程序 (doc)。在下面的示例中,我测量了主进程记录消息所需的时间。结果表明,该解决方案可以避免在将日志发送到日志记录端点时主进程阻塞。当然,仅当您的耗时任务不在主进程中完成时才有用。

      您如何看待这种方法?

      Queue: avg log call duration: 0.00004s
      Queue: min log call duration: 0.00002s
      Queue: max log call duration: 0.00018s
      
      Cloud: avg log call duration: 0.03019s
      Cloud: min log call duration: 0.00003s
      Cloud: max log call duration: 0.16630s
      

      下面是一个综合示例。

      import sys
      import os
      import time
      import google
      import logging
      import multiprocessing
      
      from logging.handlers import QueueHandler, QueueListener
      from google.cloud.logging.handlers import CloudLoggingHandler
      
      
      def do(i):
          """
              Dummy function that times the log insertion.
          """
          t = time.time()
          logging.info('%dth message.' % i)
          return time.time() - t
      
      
      if __name__ == '__main__':
      
          # The standard google cloud logging handler sends logs to the clooud logging endpoint.
          client = google.cloud.logging.Client()
          cloud_handler = CloudLoggingHandler(client=client, name="xyz")
      
          # A local handler is used to have feedbacks on what is going on.
          local_handler = logging.StreamHandler(sys.stdout)
      
          # Log records are put in the log queue.
          log_queue = multiprocessing.Queue()
      
          # The listener dequeues log records from the log queue. Each handler registered in the 
          # listener processes the log records.
          queue_listener = QueueListener(log_queue, local_handler, cloud_handler)
          queue_listener.start()
      
          # The queue handler pushes the log records to the log queue.
          queue_handler = QueueHandler(log_queue)
      
          # Setup the root loger to the handler we defined.
          root_logger = logging.getLogger()
          root_logger.setLevel(logging.INFO)
          root_logger.addHandler(queue_handler)
      
          n = 10
      
          # Emits logs and measure how fast it is with the 
          durations = [do(i) for i in range(n)]
          print('Queue: avg log call duration: %.5fs' % (sum(durations) / n))
          print('Queue: min log call duration: %.5fs' % min(durations))
          print('Queue: max log call duration: %.5fs' % max(durations))
      
          # Stop the queue listener.
          queue_listener.stop()
      
          # Remove the queue handler from the root logger.
          root_logger.removeHandler(queue_handler)
      
          # Setup the root loger to use CloudLoggingHandler.
          root_logger.setLevel(logging.INFO)
          root_logger.addHandler(local_handler)
          root_logger.addHandler(cloud_handler)
      
          # Emits logs and measure how fast it is with the 
          durations = [do(i) for i in range(n)]
          print('Queue: avg log call duration: %.5fs' % (sum(durations) / n))
          print('Queue: min log call duration: %.5fs' % min(durations))
          print('Queue: max log call duration: %.5fs' % max(durations))
      

      【讨论】:

      • 不错!有趣的是,您仅在 10 个条目中就有了如此大的差异。我认为结果不言自明。
      • 我还以派生自 CloudLoggingHandler 的自定义处理程序 BatchCloudLoggingHandler 结束。与 CloudLoggingHandler 不同,BatchCloudLoggingHandler 不是线程安全的,并且不依赖于 BackgroundThreadTransport。我只是依靠 QueueHandler 和 QueueListener 来确保线程安全。 BatchCloudLoggingHandler 执行实际批次的日志记录(即具有多个日志记录的批次)。但是,我不知道应该如何衡量这种方法的效率。
      猜你喜欢
      • 1970-01-01
      • 2018-06-21
      • 1970-01-01
      • 2012-05-11
      • 2011-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多