因为涉及到进程间互斥与通信问题,因此默认情况下Python中的logging无法在多进程环境下打印日志。但是查询了官方文档可以发现,推荐了一种利用logging.SocketHandler的方案来实现多进程日志打印。

        其原理很简单,概括一句话就是说:多个进程将各自环境下的日志通过Socket发送给一个专门打印日志的进程,这样就可以防止多进程打印的冲突与混乱情况。

        本文主要记录下SocketHandler真实的用法情况:

       简单说明下逻辑:主进程(MainProcess)启动一个专门打印日志的进程(LogReceiverProcess),并且将自己(主进程)环境下的日志都“重定向”给LogReceiverProcess。同理,在后续逻辑中启动的所有工作子进程(WorkerProcess)都做一样的操作,把自己环境下的日志都“重定向”给日志进程去打印。

Python中logging在多进程环境下打印日志

2 实现代码

2.1 日志进程

  日志进程的代码核心在于要建立一个TCP Server来接收并处理Log record,代码如下:

 1 import os
 2 import logging
 3 import logging.handlers
 4 import traceback
 5 import cPickle
 6 import struct
 7 import SocketServer
 8 from multiprocessing import Process
 9 
10 class LogRecordStreamHandler(SocketServer.StreamRequestHandler):
11     def handle(self):
12         while True:
13             try:
14                 chunk = self.connection.recv(4)
15                 if len(chunk) < 4:
16                     break
17                 slen = struct.unpack(">L", chunk)[0]
18                 chunk = self.connection.recv(slen)
19                 while len(chunk) < slen:
20                     chunk = chunk + self.connection.recv(slen - len(chunk))
21                 obj = self.unpickle(chunk)
22                 record = logging.makeLogRecord(obj)
23                 self.handle_log_record(record)
24 
25             except:
26                 break
27 
28     @classmethod
29     def unpickle(cls, data):
30         return cPickle.loads(data)
31 
32     def handle_log_record(self, record):
33         if self.server.logname is not None:
34             name = self.server.logname
35         else:
36             name = record.name
37         logger = logging.getLogger(name)
38         logger.handle(record)
39 
40 
41 class LogRecordSocketReceiver(SocketServer.ThreadingTCPServer):
42     allow_reuse_address = 1
43 
44     def __init__(self, host='localhost', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
45         SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler)
46         self.abort = 0
47         self.timeout = 1
48         self.logname = None
49 
50     def serve_until_stopped(self):
51         import select
52         abort = 0
53         while not abort:
54             rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout)
55             if rd:
56                 self.handle_request()
57             abort = self.abort
58 
59 
60 def _log_listener_process(log_format, log_time_format, log_file):
61     log_file = os.path.realpath(log_file)
62     logging.basicConfig(level=logging.DEBUG, format=log_format, datefmt=log_time_format, filename=log_file, filemode='a+')
63 
64     # Console log
65     console = logging.StreamHandler()
66     console.setLevel(logging.INFO)
67     console.setFormatter(logging.Formatter(fmt=log_format, datefmt=log_time_format))
68     logging.getLogger().addHandler(console)
69 
70     tcp_server = LogRecordSocketReceiver()
71 
72     logging.debug('Log listener process started ...')
73     tcp_server.serve_until_stopped()
View Code

相关文章: