【问题标题】:Python Watchdog process existing files on startupPython Watchdog 在启动时处理现有文件
【发布时间】:2019-12-10 10:48:58
【问题描述】:

我有一个简单的看门狗和队列进程来监视目录中的文件。 代码取自https://camcairns.github.io/python/2017/09/06/python_watchdog_jobs_queue.html

import time
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from queue import Queue
from threading import Thread

dir_path = "/data"

def process_queue(q):

    while True:
        if not q.empty():
            event = q.get()
            print("New event %s" % event)

        time.sleep(5)


class FileWatchdog(PatternMatchingEventHandler):

    def __init__(self, queue, patterns):
        PatternMatchingEventHandler.__init__(self, patterns=patterns)
        self.queue = queue

    def process(self, event):
        self.queue.put(event)

    def on_created(self, event):
        self.process(event)


if __name__ == '__main__':

    watchdog_queue = Queue()

    worker = Thread(target=process_queue, args=(watchdog_queue,))
    worker.setDaemon(True)
    worker.start()

    event_handler = FileWatchdog(watchdog_queue, patterns="*.ini")
    observer = Observer()
    observer.schedule(event_handler, path=dir_path)
    observer.start()

    try:
        while True:
            time.sleep(2)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

一旦进程运行,新文件就会被正确处理。 但是,如果我重新启动进程并且目录中已经存在一个文件,它将被忽略。

我试图创建一个字典来添加到队列中

    for file in os.listdir(dir_path):
        if file.endswith(".ini"):
             file_path = os.path.join(dir_path, file)
             event = {'event_type' : 'on_created', 'is_directory' : 'False', 'src_path' : file_path}
             watchdog_queue.put(event)

但它需要一个类型的对象(类“watchdog.events.FileCreatedEvent”),我不知道如何创建它。

另外,我可以在 Watchdog 文档(类 watchdog.utils.dirsnapshot.DirectorySnapshot)中看到,但我不知道如何运行它并将其添加到队列中。

关于如何在启动时将现有文件添加到队列中的任何建议?

【问题讨论】:

    标签: python queue watchdog python-watchdog


    【解决方案1】:

    这段代码应该能达到你想要达到的效果。

    from watchdog.events import FileCreatedEvent
    
    # Loop to get all files; dir_path is your lookup folder.
    
    for file in os.listdir(dir_path):
        filename = os.path.join(dir_path, file)
        event = FileCreatedEvent(filename)
        watchdog_queue.put(event)
    

    【讨论】:

    • 如果要排除子目录,以及if os.path.isfile(filename): 下的filename =
    【解决方案2】:

    我偶然发现了同样的问题,也许这个解决方案也适合你。至少在 linux 上,这就像一个魅力。

    添加“on_modified”方法

    class FileWatchdog(PatternMatchingEventHandler):
    
    def __init__(self, queue, patterns):
        PatternMatchingEventHandler.__init__(self, patterns=patterns)
        self.queue = queue
    
    ...
    
    def on_modified(self, event):
        self.process(event)
    

    现在启动观察者后,遍历目录中的所有文件并“触摸”它们,因此它们将被“修改”。

    # Loop to get all files; dir_path is your lookup folder.
    
    for file in os.listdir(dir_path):
        filename = os.path.join(dir_path, file)
        os.popen(f'touch {filename}')
    

    无需添加特殊过滤器,因为您的 FileHandler 会处理。

    【讨论】:

      猜你喜欢
      • 2014-06-24
      • 1970-01-01
      • 1970-01-01
      • 2014-02-08
      • 1970-01-01
      • 2017-08-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多