【问题标题】:How do I watch a file for changes?如何查看文件的更改?
【发布时间】:2010-09-15 23:33:33
【问题描述】:

我有一个日志文件正在被另一个进程写入,我想观察它的变化。每次发生更改时,我都想读取新数据以对其进行一些处理。

最好的方法是什么?我希望 PyWin32 库中有某种钩子。我找到了win32file.FindNextChangeNotification 函数,但不知道如何要求它观看特定文件。

如果有人做过这样的事情,我会非常感激听到如何...

[编辑]我应该提到我正在寻求一个不需要轮询的解决方案。

[编辑] 诅咒!似乎这不适用于映射的网络驱动器。我猜 Windows 不会像在本地磁盘上那样“听到”文件的任何更新。

【问题讨论】:

  • 在 Linux 上可以使用 strace 监控 write 调用此操作
  • @simao's answer 使用 python-watchdog。 Python-Watchdog 有很好的文档 --> 这里是 ["QuickStart"] 文档的链接,它提供了一个监视当前工作目录的最小代码示例。

标签: python file pywin32 watch


【解决方案1】:

您是否尝试过使用Watchdog

用于监控文件系统事件的 Python API 库和 shell 实用程序。

目录监控变得简单

  • 一个跨平台的 API。
  • 一个运行命令以响应目录更改的 shell 工具。

通过Quickstart中的一个简单示例快速入门...

【讨论】:

  • 可以用easy_install安装吗?查看。免费授权? Check。解决大平台上的问题? Check。我赞同这个答案。仅注意:example on their project page 不能开箱即用。请改用the one on their github
  • 我们使用看门狗。我们可以切换到 QFileSystemWatcher。只是一个公平的警告-看门狗很好,但在所有平台上都远非完美(目前)。每个操作系统都有它的特质。所以,除非你致力于让它变得完美,否则你会把头发拉出来。如果您只是想观看 10 个左右的文件,我会投票。 OS 磁盘缓存非常成熟,Watchdog 无论如何都涉及轮询 API。它主要用于观看巨大的文件夹结构恕我直言。
  • 我对看门狗的抱怨是,它有很多依赖项。当然,它比 PyQt 少,但它不起作用,感觉就像是最小的、最佳实践、做一个工作和做对的解决方案。
  • @denfromufa 在这里正确吗?看门狗真的会锁定文件,因此不能同时编辑它们以看门狗监视它们吗?我简直不敢相信,这完全没用。
  • @MichelMüller 我刚刚检查了这个例子(见下面的链接),它有效!不知道以前出了什么问题,但这个答案没有提供任何例子。 stackoverflow.com/a/18599427/2230844
【解决方案2】:

如果轮询对您来说足够好,我会观察“修改时间”文件统计信息是否发生变化。阅读:

os.stat(filename).st_mtime

(另请注意,Windows 原生更改事件解决方案并非在所有情况下都有效,例如在网络驱动器上。)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...

【讨论】:

  • 你怎么能在间隔内做到这一点?
  • @dopatraman 以下是如何在间隔内执行此操作 ` import sys import time pub = Monkey() while True: try: time.sleep(1) pub.watch() except KeyboardInterrupt: print ('\nDone') break except: print(f'Unhandled error: {sys.exc_info()[0]}') `
  • 非常简单的解决方案!我添加了一个检查,以防止它报告第一次运行时更改的文件:if self._cached_stamp is not None
  • 没有@VladBezden 所写的watch 方法。代码丢失了吗?
【解决方案3】:

您是否已经查看过http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html 上提供的文档?如果您只需要它在 Windows 下工作,那么第二个示例似乎正是您想要的(如果您将目录的路径与您想要观看的文件之一交换)。

否则,轮询可能是唯一真正独立于平台的选项。

注意:我没有尝试过这些解决方案。

【讨论】:

  • 这个答案是特定于 Windows 的,但这里似乎也发布了一些针对这个问题的跨平台解决方案。
  • 是否有基准,如果这个过程较慢,那么用 c++ 之类的本机语言实现它?
  • 最好插入引用来源的相关内容,因为它们可能会过时。
  • (1.) 在这个答案的末尾是一个强烈的免责声明......“我没有尝试过任何这些解决方案”。 (2.)这个答案或多或少是"link only" answer (3.)答案提到了“轮询”,但在此之后没有提供任何有用的添加......而@Deestan's answer确实提供了一些关于轮询的好信息跨度>
【解决方案4】:

如果您想要多平台解决方案,请查看QFileSystemWatcher。 这里是一个示例代码(未清理):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

【讨论】:

  • 我认为这很可能是最好的答案,因为它们要么 a) 依赖 Win32 的 FileSystemwatcher 对象并且无法移植,要么 b) 轮询文件(这对性能和不会缩放)。遗憾的是 Python 没有内置这个工具,因为如果你使用的只是 QFileSystemWatcher 类,那么 PyQt 是一个巨大的依赖项。
  • 我喜欢这个解决方案。我想指出您需要一个 QApplication 实例才能使其工作,我在导入下方添加了“app = QtGui.QApplication(sys.argv)”,然后在信号连接之后添加了“app.exec_()”。
  • 刚刚在 Linux 机器上测试这个,我看到 directory_changed 方法被调用,但不是 file_changed。
  • @CadentOrange,如果你不喜欢 pyQt 依赖,the watchdog package is the right answer
  • PyQt5 不再支持此功能
【解决方案5】:

它不应该在windows上工作(也许用cygwin?),但对于unix用户,你应该使用“fcntl”系统调用。这是 Python 中的一个示例。如果您需要用 C 编写(相同的函数名称),它几乎是相同的代码

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)

【讨论】:

  • 在 ext4 文件系统(在 Ubuntu 10.04 上)上使用 Linux 内核 2.6.31 就像一个魅力,但仅适用于目录 - 如果我将它与文件一起使用,它会引发 IOError “不是目录” .
  • 太棒了!对我来说也一样,仅适用于目录并监视此目录中的文件。但它不适用于子目录中的已修改文件,因此看起来您需要遍历子目录并查看所有子目录。 (或者有更好的方法吗?)
【解决方案6】:

查看pyinotify

inotify 在较新的 linux 中替换了 dnotify(来自较早的答案),并允许文件级而不是目录级监控。

【讨论】:

【解决方案7】:

在对 Tim Golden 的脚本进行了一些修改之后,我得到了以下似乎运行良好的内容:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

它可能会加载更多的错误检查,但是对于简单地查看日志文件并在将其吐出到屏幕之前对其进行一些处理,这很有效。

感谢大家的投入 - 很棒的东西!

【讨论】:

    【解决方案8】:

    对于使用轮询和最小依赖项查看单个文件,这里有一个完整的示例,基于来自Deestan(上)的回答:

    import os
    import sys 
    import time
    
    class Watcher(object):
        running = True
        refresh_delay_secs = 1
    
        # Constructor
        def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
            self._cached_stamp = 0
            self.filename = watch_file
            self.call_func_on_change = call_func_on_change
            self.args = args
            self.kwargs = kwargs
    
        # Look for changes
        def look(self):
            stamp = os.stat(self.filename).st_mtime
            if stamp != self._cached_stamp:
                self._cached_stamp = stamp
                # File has changed, so do something...
                print('File changed')
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)
    
        # Keep watching in a loop        
        def watch(self):
            while self.running: 
                try: 
                    # Look for changes
                    time.sleep(self.refresh_delay_secs) 
                    self.look() 
                except KeyboardInterrupt: 
                    print('\nDone') 
                    break 
                except FileNotFoundError:
                    # Action on file not found
                    pass
                except: 
                    print('Unhandled error: %s' % sys.exc_info()[0])
    
    # Call this function each time a change happens
    def custom_action(text):
        print(text)
    
    watch_file = 'my_file.txt'
    
    # watcher = Watcher(watch_file)  # simple
    watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going
    

    【讨论】:

    • 您可以将 watch_file_cached_stamp 放入列表中,并在 for 循环中遍历它们。虽然不能很好地扩展到大量文件
    • 这不是每次运行时都会触发动作吗? _cached_stamp 设置为 0,然后与 os.stat(self.filename).st_mtime 进行比较。 _cached_stamp 应该在构造函数中设置为 os.stat(self.filename).st_mtime 吧?
    • call_func_on_change() 将在look() 首次运行时触发,但随后_cached_stamp 会更新,因此在os.stat(self.filename).st_mtime. _cached_stamp 的值发生变化之前不会再次触发。
    • 如果你不想在第一次运行时调用call_func_on_change(),你可以在构造函数中设置_cached_stamp的值
    • 我已经使用您的脚本在文件更改时调用了一些函数。我的函数不像你的那样接受任何参数。我认为要使其正常工作,我需要删除 *args, **kwargs 看起来(我只放了更改的行):self.call_func_on_change(self) def custom_action(): watcher = Watcher(watch_file, custom_action()) 但这不起作用。仅在第一次迭代期间调用操作:文件更改是,更改文件更改文件更改文件更改当我保留 *args 并调用它时它开始工作:watcher = Watcher(watch_file, custom_action) 我很难想知道为什么?
    【解决方案9】:

    检查my answersimilar question。您可以在 Python 中尝试相同的循环。 This page 建议:

    import time
    
    while 1:
        where = file.tell()
        line = file.readline()
        if not line:
            time.sleep(1)
            file.seek(where)
        else:
            print line, # already has newline
    

    另见问题tail() a file with Python

    【讨论】:

    • 你可以 sys.stdout.write(line)。如果文件被截断,您的代码将不起作用。 Python有内置函数file()。
    • 我已经发布了您的代码的修改版本。如果它适合您,您可以将其纳入您的答案中。
    【解决方案10】:

    这是 Kender 代码的简化版本,它似乎具有相同的技巧,并且不会导入整个文件:

    # Check file for new data.
    
    import time
    
    f = open(r'c:\temp\test.txt', 'r')
    
    while True:
    
        line = f.readline()
        if not line:
            time.sleep(1)
            print 'Nothing New'
        else:
            print 'Call Function: ', line
    

    【讨论】:

      【解决方案11】:

      这是对 Tim Goldan 脚本的另一种修改,它在 unix 类型上运行,并通过使用 dict (file=>time) 添加了一个简单的文件修改观察器。

      用法:whateverName.py path_to_dir_to_watch

      #!/usr/bin/env python
      
      import os, sys, time
      
      def files_to_timestamp(path):
          files = [os.path.join(path, f) for f in os.listdir(path)]
          return dict ([(f, os.path.getmtime(f)) for f in files])
      
      if __name__ == "__main__":
      
          path_to_watch = sys.argv[1]
          print('Watching {}..'.format(path_to_watch))
      
          before = files_to_timestamp(path_to_watch)
      
          while 1:
              time.sleep (2)
              after = files_to_timestamp(path_to_watch)
      
              added = [f for f in after.keys() if not f in before.keys()]
              removed = [f for f in before.keys() if not f in after.keys()]
              modified = []
      
              for f in before.keys():
                  if not f in removed:
                      if os.path.getmtime(f) != before.get(f):
                          modified.append(f)
      
              if added: print('Added: {}'.format(', '.join(added)))
              if removed: print('Removed: {}'.format(', '.join(removed)))
              if modified: print('Modified: {}'.format(', '.join(modified)))
      
              before = after
      

      【讨论】:

      • 更新支持python3
      • 太棒了,谢谢!将其卡在带有要退出的事件的线程中,等等。我用较新的scandir 替换了listdir,据说速度更快,结果也有方便的stat() 方法来获取模组时间。
      【解决方案12】:

      好吧,既然您使用的是 Python,您可以打开一个文件并继续读取其中的行。

      f = open('file.log')
      

      如果读取的行非空,则处理它。

      line = f.readline()
      if line:
          // Do what you want with the line
      

      您可能错过了在 EOF 继续拨打readline 是可以的。在这种情况下,它只会继续返回一个空字符串。当日志文件中附加了一些内容时,读取将从停止的地方继续,根据您的需要。

      如果您正在寻找使用事件或特定库的解决方案,请在您的问题中说明这一点。否则,我认为这个解决方案很好。

      【讨论】:

        【解决方案13】:

        对我来说最简单的解决方案是使用看门狗的工具 watchmedo

        来自https://pypi.python.org/pypi/watchdog 我现在有一个进程可以在目录中查找 sql 文件并在必要时执行它们。

        watchmedo shell-command \
        --patterns="*.sql" \
        --recursive \
        --command='~/Desktop/load_files_into_mysql_database.sh' \
        .
        

        【讨论】:

          【解决方案14】:

          正如您在Horst Gutmann 所指的Tim Golden's article 中所见,WIN32 相对复杂并且监视目录,而不是单个文件。

          我想建议您研究一下IronPython,这是一个 .NET python 实现。 使用 IronPython,您可以使用所有 .NET 功能 - 包括

          System.IO.FileSystemWatcher
          

          使用简单的Event接口处理单个文件。

          【讨论】:

          • @Ciasto 因为那样你就必须拥有 Iron Python 而不是基本的 Python 安装。
          【解决方案15】:

          这是检查文件更改的示例。一种可能不是最好的方法,但它肯定是一条捷径。

          当对源代码进行更改时,用于重新启动应用程序的便捷工具。我在玩 pygame 时做了这个,所以我可以在文件保存后立即看到效果。

          在 pygame 中使用时,请确保将“while”循环中的内容放置在您的游戏循环中,即更新或其他任何内容中。否则您的应用程序将陷入无限循环,您将看不到游戏更新。

          file_size_stored = os.stat('neuron.py').st_size
          
            while True:
              try:
                file_size_current = os.stat('neuron.py').st_size
                if file_size_stored != file_size_current:
                  restart_program()
              except: 
                pass
          

          如果您想要我在网上找到的重启代码。这里是。 (与问题无关,虽然它可以派上用场)

          def restart_program(): #restart application
              python = sys.executable
              os.execl(python, python, * sys.argv)
          

          让电子做你想让他们做的事,玩得开心。

          【讨论】:

          • 似乎使用 .st_mtime 而不是 .st_size 会更可靠,并且这样做的方式同样短,尽管 OP 表示他不想通过轮询来做到这一点。
          【解决方案16】:
          ACTIONS = {
            1 : "Created",
            2 : "Deleted",
            3 : "Updated",
            4 : "Renamed from something",
            5 : "Renamed to something"
          }
          FILE_LIST_DIRECTORY = 0x0001
          
          class myThread (threading.Thread):
              def __init__(self, threadID, fileName, directory, origin):
                  threading.Thread.__init__(self)
                  self.threadID = threadID
                  self.fileName = fileName
                  self.daemon = True
                  self.dir = directory
                  self.originalFile = origin
              def run(self):
                  startMonitor(self.fileName, self.dir, self.originalFile)
          
          def startMonitor(fileMonitoring,dirPath,originalFile):
              hDir = win32file.CreateFile (
                  dirPath,
                  FILE_LIST_DIRECTORY,
                  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
                  None,
                  win32con.OPEN_EXISTING,
                  win32con.FILE_FLAG_BACKUP_SEMANTICS,
                  None
              )
              # Wait for new data and call ProcessNewData for each new chunk that's
              # written
              while 1:
                  # Wait for a change to occur
                  results = win32file.ReadDirectoryChangesW (
                      hDir,
                      1024,
                      False,
                      win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
                      None,
                      None
                  )
                  # For each change, check to see if it's updating the file we're
                  # interested in
                  for action, file_M in results:
                      full_filename = os.path.join (dirPath, file_M)
                      #print file, ACTIONS.get (action, "Unknown")
                      if len(full_filename) == len(fileMonitoring) and action == 3:
                          #copy to main file
                          ...
          

          【讨论】:

            【解决方案17】:

            好像没有人发过fswatch。它是一个跨平台的文件系统观察者。只需安装、运行并按照提示操作即可。

            我已经将它与 python 和 golang 程序一起使用,并且可以正常工作。

            【讨论】:

              【解决方案18】:

              因为我已经全局安装了它,所以我最喜欢的方法是使用 nodemon。如果您的源代码在src,并且您的入口点是src/app.py,那么很简单:

              nodemon -w 'src/**' -e py,html --exec python src/app.py
              

              ...-e py,html 可让您控制要监视更改的文件类型。

              【讨论】:

                【解决方案19】:

                下面是一个示例,用于查看每秒写入不超过一行但通常要少得多的输入文件。目标是将最后一行(最近的写入)附加到指定的输出文件。我从我的一个项目中复制了这个,只是删除了所有不相关的行。您必须填写或更改缺少的符号。

                from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
                from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 
                
                class MainWindow(QMainWindow, Ui_MainWindow):
                    def __init__(self, parent=None):
                        QMainWindow.__init__(self, parent)
                        Ui_MainWindow.__init__(self)
                        self._fileWatcher = QFileSystemWatcher()
                        self._fileWatcher.fileChanged.connect(self.fileChanged)
                
                    def fileChanged(self, filepath):
                        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
                        # ^^ About to test this, may need more sophisticated solution
                        with open(filepath) as file:
                            lastLine = list(file)[-1]
                        destPath = self._filemap[filepath]['dest file']
                        with open(destPath, 'a') as out_file:               # a= append
                            out_file.writelines([lastLine])
                

                当然,包含 QMainWindow 类并不是严格要求的,即。您可以单独使用 QFileSystemWatcher。

                【讨论】:

                  【解决方案20】:

                  您还可以使用一个名为repyt 的简单库,这是一个示例:

                  repyt ./app.py
                  

                  【讨论】:

                    【解决方案21】:

                    相关@4Oh4 解决方案平滑更改要观看的文件列表;

                    import os
                    import sys
                    import time
                    
                    class Watcher(object):
                        running = True
                        refresh_delay_secs = 1
                    
                        # Constructor
                        def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
                            self._cached_stamp = 0
                            self._cached_stamp_files = {}
                            self.filenames = watch_files
                            self.call_func_on_change = call_func_on_change
                            self.args = args
                            self.kwargs = kwargs
                    
                        # Look for changes
                        def look(self):
                            for file in self.filenames:
                                stamp = os.stat(file).st_mtime
                                if not file in self._cached_stamp_files:
                                    self._cached_stamp_files[file] = 0
                                if stamp != self._cached_stamp_files[file]:
                                    self._cached_stamp_files[file] = stamp
                                    # File has changed, so do something...
                                    file_to_read = open(file, 'r')
                                    value = file_to_read.read()
                                    print("value from file", value)
                                    file_to_read.seek(0)
                                    if self.call_func_on_change is not None:
                                        self.call_func_on_change(*self.args, **self.kwargs)
                    
                        # Keep watching in a loop
                        def watch(self):
                            while self.running:
                                try:
                                    # Look for changes
                                    time.sleep(self.refresh_delay_secs)
                                    self.look()
                                except KeyboardInterrupt:
                                    print('\nDone')
                                    break
                                except FileNotFoundError:
                                    # Action on file not found
                                    pass
                                except Exception as e:
                                    print(e)
                                    print('Unhandled error: %s' % sys.exc_info()[0])
                    
                    # Call this function each time a change happens
                    def custom_action(text):
                        print(text)
                        # pass
                    
                    watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']
                    
                    # watcher = Watcher(watch_file)  # simple
                    
                    
                    
                    if __name__ == "__main__":
                        watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
                        watcher.watch()  # start the watch going
                    

                    【讨论】:

                      【解决方案22】:

                      最好和最简单的解决方案是使用 pygtail: https://pypi.python.org/pypi/pygtail

                      from pygtail import Pygtail
                      import sys
                      
                      while True:
                          for line in Pygtail("some.log"):
                              sys.stdout.write(line)
                      

                      【讨论】:

                        【解决方案23】:
                        import inotify.adapters
                        from datetime import datetime
                        
                        
                        LOG_FILE='/var/log/mysql/server_audit.log'
                        
                        
                        def main():
                            start_time = datetime.now()
                            while True:
                                i = inotify.adapters.Inotify()
                                i.add_watch(LOG_FILE)
                                for event in i.event_gen(yield_nones=False):
                                    break
                                del i
                        
                                with open(LOG_FILE, 'r') as f:
                                    for line in f:
                                        entry = line.split(',')
                                        entry_time = datetime.strptime(entry[0],
                                                                       '%Y%m%d %H:%M:%S')
                                        if entry_time > start_time:
                                            start_time = entry_time
                                            print(entry)
                        
                        
                        if __name__ == '__main__':
                            main()
                        

                        【讨论】:

                          【解决方案24】:

                          最简单的解决方案是在一段时间后获取同一文件的两个实例并比较它们。你可以试试这样的

                              while True:
                                  # Capturing the two instances models.py after certain interval of time
                                  print("Looking for changes in " + app_name.capitalize() + " models.py\nPress 'CTRL + C' to stop the program")
                                  with open(app_name.capitalize() + '/filename', 'r+') as app_models_file:
                                      filename_content = app_models_file.read()
                                  time.sleep(5)
                                  with open(app_name.capitalize() + '/filename', 'r+') as app_models_file_1:
                                      filename_content_1 = app_models_file_1.read()
                                  # Comparing models.py after certain interval of time
                                  if filename_content == filename_content_1:
                                      pass
                                  else:
                                      print("You made a change in " + app_name.capitalize() + " filename.\n")
                                      cmd = str(input("Do something with the file?(y/n):"))
                                      if cmd == 'y':
                                          # Do Something
                                      elif cmd == 'n':
                                          # pass or do something
                                      else:
                                          print("Invalid input")
                          

                          【讨论】:

                            【解决方案25】:

                            我的理解是你想不断检查某种数据是否写入特定文件,这是一个解决方案:

                            import os
                            filename="myfile.txt" # Add full path if file is located in another directory
                            initial_filesize=os.path.getsize(filename) # Getting size of the file for comparition 
                            while 1:
                                final_filesize=os.path.getsize(filename)
                                if final_filsize<intial_filesize or final_filesize>initial_filesize:
                                    print("change in file")
                                    break
                                    # You execute your code here
                                else:
                                    pass
                            

                            以上代码如下:

                            1. 首先我们使用OS module 来与系统或操作系统进行交互。
                            2. 在进入循环之前,我们已经检索了文件的原始文件大小,并将其命名为initial_filesize
                            3. 现在我们使用While loop,在其中我们不断检索文件的文件大小,如果进行了更改,它将进入if statement

                            所以基本上,我们只是使用文件大小来检查文件中是否进行了更改,非常简单,这种方法几乎适用于所有文件类型。

                            【讨论】:

                              【解决方案26】:

                              如果您使用的是 windows,请创建此 POLL.CMD 文件

                              @echo off
                              :top
                              xcopy /m /y %1 %2 | find /v "File(s) copied"
                              timeout /T 1 > nul
                              goto :top
                              

                              然后您可以键入“poll dir1 dir2”,它会将所有文件从 dir1 复制到 dir2 并每秒检查一次更新。

                              “查找”是可选的,只是为了减少控制台的噪音。

                              这不是递归的。也许您可以在 xcopy 上使用 /e 使其递归。

                              【讨论】:

                                【解决方案27】:

                                我不知道任何 Windows 特定功能。您可以尝试每秒/分钟/小时获取文件的 MD5 散列(取决于您需要多快)并将其与最后一个散列进行比较。当它不同时,您知道文件已更改并且您读出了最新的行。

                                【讨论】:

                                  【解决方案28】:

                                  我会尝试这样的。

                                      try:
                                              f = open(filePath)
                                      except IOError:
                                              print "No such file: %s" % filePath
                                              raw_input("Press Enter to close window")
                                      try:
                                              lines = f.readlines()
                                              while True:
                                                      line = f.readline()
                                                      try:
                                                              if not line:
                                                                      time.sleep(1)
                                                              else:
                                                                      functionThatAnalisesTheLine(line)
                                                      except Exception, e:
                                                              # handle the exception somehow (for example, log the trace) and raise the same exception again
                                                              raw_input("Press Enter to close window")
                                                              raise e
                                      finally:
                                              f.close()
                                  

                                  循环检查自上次读取文件后是否有新行 - 如果有,则将其读取并传递给 functionThatAnalisesTheLine 函数。如果没有,脚本将等待 1 秒并重试该过程。

                                  【讨论】:

                                  • -1:当文件可能有 100 MB 大时,打开文件并读取行并不是一个好主意。您还必须为每个文件运行它,当您想观看 1000 个文件时,这会很糟糕。
                                  • 真的吗?打开文件进行更改?
                                  猜你喜欢
                                  • 1970-01-01
                                  • 1970-01-01
                                  • 2018-08-18
                                  • 2014-11-18
                                  • 2012-03-16
                                  • 2010-09-08
                                  • 1970-01-01
                                  • 2023-01-13
                                  相关资源
                                  最近更新 更多