【问题标题】:How to process access log using python multiprocessing library?如何使用 python 多处理库处理访问日志?
【发布时间】:2018-09-12 03:02:06
【问题描述】:

我必须根据客户端 IP 和访问主机解析来自服务器的 30 天访问日志,并且需要知道前 10 个访问站点。日志文件大小约为 10-20 GB,单线程执行脚本需要大量时间。最初,我编写了一个运行良好的脚本,但由于日志文件很大,因此需要很长时间。然后我尝试为并行处理实现多处理库,但它不起作用。似乎多处理的实现是重复任务而不是进行并行处理。不确定,代码有什么问题。有人可以帮忙吗?非常感谢您的帮助。

代码:

  from datetime import datetime, timedelta
  import commands
  import os
  import string
  import sys
  import multiprocessing


  def ipauth (slave_list, static_ip_list):

      file_record = open('/home/access/top10_domain_accessed/logs/combined_log.txt', 'a')
      count = 1
      while (count <=30):
      Nth_days = datetime.now() - timedelta(days=count)
      date = Nth_days.strftime("%Y%m%d")
      yr_month = Nth_days.strftime("%Y/%m")
      file_name = 'local2' + '.' + date
      with open(slave_list) as file:
        for line in file:
            string = line.split()
            slave = string[0]
            proxy = string[1]
            log_path = "/LOGS/%s/%s" %(slave, yr_month)

            try:
               os.path.exists(log_path)
               file_read = os.path.join(log_path, file_name)
               with open(file_read) as log:
                     for log_line in log:
                        log_line = log_line.strip()
                        if proxy in log_line:
                           file_record.write(log_line + '\n')
            except IOError:
               pass

        count = count + 1
        file_log = open('/home/access/top10_domain_accessed/logs/ipauth_logs.txt', 'a')
        with open(static_ip_list) as ip:
             for line in ip:
                with open('/home/access/top10_domain_accessed/logs/combined_log.txt','r') as f:
             for content in f:
                log_split = content.split()
                client_ip = log_split[7]
                if client_ip in line:
                   content = str(content).strip()
                   file_log.write(content + '\n')

         return

   if __name__ == '__main__':
        slave_list = sys.argv[1]
        static_ip_list = sys.argv[2]
        jobs = []
        for i in range(5):
           p = multiprocessing.Process(target=ipauth, args=(slave_list, static_ip_list))
           jobs.append(p)
           p.start()
           p.join()

【问题讨论】:

    标签: python python-2.7 python-multiprocessing


    【解决方案1】:

    与 OP 交谈后更新,请查看评论

    我的看法:将文件拆分成更小的块并使用进程池处理这些块:

    import multiprocessing
    
    def chunk_of_lines(fp, n):
        # read n lines from file
        # then yield
        pass
    
    def process(lines):
        pass # do stuff to a file
    
    p = multiprocessing.Pool()
    fp = open(slave_list)
    for f in chunk_of_lines(fp,10):
        p.apply_async(process, [f,static_ip_list])
    p.close()
    p.join() # Wait for all child processes to close.
    

    有很多方法可以实现chunk_of_lines 方法,您可以使用简单的for 遍历文件行,或者执行更高级的操作,例如调用fp.read()

    【讨论】:

    • 感谢您的建议。但是在这里我只传递文件路径作为函数中的参数,所以 slave_list 不是可迭代的东西。实际上 slave_list 和 static_ip_list 都是文件路径。我尝试在 apply_async 函数中传递 slave_list 而不是 f ,我看到有多个进程正在运行,但我相信所有进程都在同一数据中进行操作。所以,这是一种重复的任务。有什么建议吗?谢谢。
    猜你喜欢
    • 2013-03-24
    • 2010-11-04
    • 2022-12-04
    • 1970-01-01
    • 2016-08-28
    • 2022-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多