【问题标题】:Python parallel thread that consume Watchdog queue events消耗看门狗队列事件的 Python 并行线程
【发布时间】:2020-06-23 05:41:27
【问题描述】:

每当外部程序 (TCPdump) 在我的目录中创建 *.pcap 文件时,我有这段代码应该将事件放入队列中。 我的问题是我总是得到一个空队列,尽管我从 process() 函数得到了打印。

我做错了什么?队列是否正确定义并在两个类之间共享?

编辑-----------------
我可能明白为什么我有一个空队列,我认为这是因为我正在打印我在 Handler 类填充之前初始化的队列。 我修改了我的代码并创建了两个应该使用同一个队列的进程,但是现在执行卡在 queue.put() 并且线程 ReadPcapFiles() 停止运行。

这里是更新的代码:

import time
import pyshark
import concurrent.futures
import threading
import logging
from queue import Queue
from multiprocessing import Process
from watchdog.observers import Observer, api
from watchdog.events import PatternMatchingEventHandler

class Handler(PatternMatchingEventHandler):
    patterns = ["*.pcap", "*.pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):
        #print(f'event type: {event.event_type}  path : {event.src_path}')   
        self.queue.put(event.src_path)
        logging.info(f"Storing message: {self.queue.qsize()}")
        print("Producer queue: ", list(self.queue.queue))
        #self.queue.get()

    def on_created(self, event):
        self.process(event)          


def StartWatcher(watchdogq, event):
    path = 'C:\\...'
    handler = Handler(watchdogq)
    observer = Observer()
    while not event.is_set():
        observer.schedule(handler, path, recursive=False)
        print("About to start observer")
        observer.start()
        try:
            while True:
                time.sleep(1)
        except Exception as error:
            observer.stop()
            print("Error: " + str(error))
        observer.join()


def ReadPcapFiles(consumerq, event):
    while not event.is_set() or not consumerq.empty():
        print("Consumer queue: ", consumerq.get())
        #print("Consumer queue: ", list(consumerq.queue))

    # pcapfile = pyshark.FileCapture(self.queue.get())
    #     for packet in pcapfile:
    #         countPacket +=1 

if __name__ == '__main__':
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
    logging.getLogger().setLevel(logging.DEBUG)

    queue = Queue()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(StartWatcher,queue, event)
        executor.submit(ReadPcapFiles,queue, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

旧代码:

import time
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

class Handler(PatternMatchingEventHandler):
    patterns = ["*.pcap", "*.pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):
        print(f'event type: {event.event_type}  path : {event.src_path}')   
        self.queue.put(event.src_path)

    def on_created(self, event):
        self.process(event)

class Watcher():
    def __init__(self, path):
        self.queue = Queue()
        self.observer = Observer()
        self.handler = Handler(self.queue)
        self.path = path

    def start(self): 
        self.observer.schedule(self.handler, self.path, recursive=True)
        self.observer.start()
        try:
            while True:
                time.sleep(1)
                self.queue.get()
                print(list(self.queue.queue))
        except Exception as error:
            self.observer.stop()
            print("Error: " + str(error))
        self.observer.join()  

if __name__ == '__main__':
    watcher = Watcher('C:\\...')
    watcher.start()

【问题讨论】:

    标签: python python-3.x multithreading queue python-multiprocessing


    【解决方案1】:

    这对我有用(我从this answer 得到了主要想法,谢谢!)但请注意,我认为这是一种解决方法,所以如果有人对此有更好的解决方案或者可以更好地解释这种行为的原因Python,请不要犹豫回答!

    我的猜测是我有两个主要问题:
    - 我正在另一个线程中启动 Watchdog 进程(这以某种方式阻塞了我的队列消耗线程)。
    - Python 线程处理not work really in parallel,因此需要启动一个独立的进程。

    这是我的代码:

    import time
    import pyshark
    import threading
    import logging
    import os
    from queue import Queue
    from multiprocessing import Process, Pool
    from watchdog.observers import Observer, api
    from watchdog.events import PatternMatchingEventHandler
    from concurrent.futures import ThreadPoolExecutor
    
    class Handler(PatternMatchingEventHandler):
        patterns = ["*.pcap", "*.pcapng"]
    
        def __init__(self, queue):
            PatternMatchingEventHandler.__init__(self)
            self.queue = queue
    
        def process(self, event):  
            self.queue.put(event.src_path)
            logging.info(f"Storing message: {self.queue.qsize()}")
            print("Producer queue: ", list(self.queue.queue))
    
    
        def on_created(self, event):
            #wait that the transfer of the file is finished before processing it
            file_size = -1
            while file_size != os.path.getsize(event.src_path):
                file_size = os.path.getsize(event.src_path)
                time.sleep(1)
    
            self.process(event)         
    
    def ConsumeQueue(consumerq):
        while True:
            if not consumerq.empty(): 
                pool = Pool()
                pool.apply_async(ReadPcapFiles, (consumerq.get(), ))
            else:    
                time.sleep(1)
    
    def ReadPcapFiles(get_event):        
        createdFile = get_event
        print(f"This is my event in ReadPacapFile {createdFile}")
    
        countPacket = 0
        bandwidth = 0
        pcapfile = pyshark.FileCapture(createdFile)
        for packet in pcapfile:
            countPacket +=1
            bandwidth = bandwidth + int(packet.length)
        print(f"Packet nr {countPacket}")
        print(f"Byte per second {bandwidth}")
    
    
    if __name__ == '__main__':
    
        format = "%(asctime)s: %(message)s"
        logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
        logging.getLogger().setLevel(logging.DEBUG)
    
        queue = Queue()
        path = 'C:\\...'
    
        worker = threading.Thread(target=ConsumeQueue, args=(queue, ), daemon=True)
        print("About to start worker")
        worker.start()
    
        event_handler = Handler(queue)
        observer = Observer()
        observer.schedule(event_handler, path, recursive=False)
        print("About to start observer")
        observer.start()
    
        try:
            while True:
                time.sleep(1)
        except Exception as error:
            observer.stop()
            print("Error: " + str(error))
        observer.join()
    

    【讨论】:

      【解决方案2】:

      有一个优秀的库可以提供对该队列中项目的并发访问。队列也是持久的[基于文件以及基于数据库],因此如果程序崩溃,您仍然可以从程序崩溃的位置消费事件。

      persist-queue

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-12-24
        • 2011-12-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-04-19
        相关资源
        最近更新 更多