【问题标题】:Directory watching for changes in java目录监视java中的变化
【发布时间】:2019-02-04 18:22:07
【问题描述】:

我正在使用 WatchService 来观察目录的变化,特别是在目录中创建新文件。下面是我的代码 -

package watcher;

import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.io.*;


public class Watch {
    public static void main(String[] args) throws IOException {
        Path dir = Paths.get("c:\\mk\\");
        WatchService service = FileSystems.getDefault().newWatchService();
        WatchKey key = dir.register(service, ENTRY_CREATE);

        System.out.println("Watching directory: "+dir.toString());
        for(;;){
            WatchKey key1;
            try {
                key1 = service.take();
            } catch (InterruptedException x) {
                break;
            }

            for (WatchEvent<?> event: key1.pollEvents()) {
                WatchEvent.Kind<?> kind = event.kind();

                if (kind == OVERFLOW) {
                    continue;
                }

                WatchEvent<Path> ev = (WatchEvent<Path>)event;
                Path filename = ev.context();
                Path child = dir.resolve(filename);
                System.out.println("New file: "+child.toString()+" created.");
                try{
                    FileInputStream in = new FileInputStream(child.toFile());
                    System.out.println("File opened for reading");
                    in.close();
                    System.out.println("File Closed");
                }catch(Exception x){
                    x.printStackTrace();
                }
            }

            boolean valid = key.reset();
            if (!valid) {
                break;
            }
        }
    }
}

当我在“mk”目录中创建文件时,我会收到通知。但是当我在这个目录中复制一些文件时,我在打开那个复制的文件时遇到了异常。

我的猜测是 Windows Copier 对话框仍然锁定了该文件,我无法打开该文件。所以基本上我想知道的是如何获得文件已被其他进程关闭的通知。

上述代码的输出类似于 -

Watching directory: c:\mk
New file: c:\mk\New Text Document (2).txt created.
File opened for reading
File Closed
New file: c:\mk\Config.class created.
java.io.FileNotFoundException: c:\mk\Config.class (The process cannot access the file because it is being used by another process)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at watcher.Watch.main(Watch.java:36)
New file: c:\mk\New Text Document (3).txt created.
File opened for reading
File Closed

文件“New Text Document (2).txt”和“New Text Document (3).txt”是我创建的,但文件“Config.class”是从其他目录复制的。

请帮帮我。

【问题讨论】:

  • Java 7 有文件观察器,它可以帮助docs.oracle.com/javase/tutorial/essential/io/notification.html
  • 是的,我在下面发布了解决方案。我以前遇到过很多麻烦,所以为了帮助其他人,我已经回答了我的问题。
  • 实际上,这个文件观察器的问题是:如果您将文件复制到正在监视的目录中,则在复制文件时会收到通知,因此您无法打开文件进行读/写Java。

标签: java nio


【解决方案1】:

我通过实现算法得到了这个工作:观察线程将文件名放入 BlockingQueue 中,其他线程将轮询该队列,获取文件名,尝试几次打开文件。如果文件被打开,Windows Copier 已释放文件锁定,我们可以继续。因此,当其他线程发现文件已解锁时,其他线程会将这个文件名放入已处理队列中,我的应用程序将从那里检索文件名。还有其他线程在打开文件检查文件解锁时,如果解锁文件运行时间长,我们可以把这个文件名放回BlockingQueue中处理其他文件名,前者可以稍后处理。

解决方案:希望这对其他人有帮助:

package dirwatch;

import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static java.nio.file.LinkOption.*;
import java.nio.file.attribute.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class WatchDir {
    private final WatchService watcher;
    private final Map<WatchKey,Path> keys;
    private final boolean recursive;
    private boolean trace = false;

    private BlockingQueue<String> fileProcessingQueue;

    //******* processedFileQueue **** will be used by other threads to retrive unlocked files.. so I have 
    // kept as public final
    public final BlockingQueue<String> processedFileQueue;
    private volatile boolean closeProcessingThread;
    private volatile boolean closeWatcherThread;


    private void processFiles(){
        System.out.println("DirWatchProcessingThread Started");
        String fileName;
        outerLoop: while(!closeProcessingThread || !fileProcessingQueue.isEmpty()){
            try{
                fileName = fileProcessingQueue.poll(1000, TimeUnit.MILLISECONDS);
            }catch(InterruptedException ie){
                fileName = null;
            }

            if(fileName == null || fileName.equals("")){
                continue outerLoop;
            }

            long startTime = System.currentTimeMillis();
            innerLoop: while(true){
                FileInputStream fis = null;
                File file = new File(fileName);
                try{
                    fis = new FileInputStream(fileName);
                    break innerLoop;
                }catch(FileNotFoundException fnfe){
                    if(!file.exists() || file.isDirectory()){
                        System.out.println("File: '"+fileName+"has been deleted in file system or it is not file. Not processing this file.");
                        continue outerLoop;
                    }
                    try{
                        Thread.sleep(WatchDirParameters.millisToPuaseForFileLock);
                    }catch(InterruptedException ie){
                    }
                    if((System.currentTimeMillis() - startTime) > WatchDirParameters.millisToSwapFileForUnlocking){
                        if(fileProcessingQueue.offer(fileName)){
                            continue outerLoop;
                        }else{
                            startTime = System.currentTimeMillis();
                            continue innerLoop;
                        }
                    }
                }finally{
                    if(fis != null){
                        try{
                            fis.close();
                        }catch(IOException ioe){
                            ioe.printStackTrace();
                        }
                    }
                }
            }

            System.out.println("Queuing File: "+fileName);
            processedLoop:while(true){
                try{
                    if(processedFileQueue.offer(fileName, 1000, TimeUnit.MILLISECONDS)){
                        break processedLoop;
                    }
                }catch(InterruptedException ie){
                    //ie.printStackTrace();
                }
            }
        }
        closeWatcherThread = true;
        closeProcessingThread = true;
        System.out.println("DirWatchProcessingThread Exited");
    }

    /**
     * Process all events for keys queued to the watcher
     */
    private void processEvents(){
        System.out.println("DirWatcherThread started.");
        while(!closeWatcherThread) {
            // wait for key to be signalled
            WatchKey key;
            try {
                key = watcher.take();
            } catch (InterruptedException x) {
                // if we are returning from these method, it means we no longer wants to watch directory
                // we must close thread which may be waiting for file names in queue
                continue;
            }catch(ClosedWatchServiceException cwse){
                break;
            }

            Path dir = keys.get(key);
            if (dir == null) {
                System.err.println("WatchKey not recognized!!");
                continue;
            }

            try{
                for (WatchEvent<?> event: key.pollEvents()) {
                    WatchEvent.Kind kind = event.kind();

                    if (kind == OVERFLOW) {
                        continue;
                    }

                    // Context for directory entry event is the file name of entry
                    WatchEvent<Path> ev = cast(event);
                    Path name = ev.context();
                    Path child = dir.resolve(name);
                    if(kind.equals(ENTRY_CREATE)){
                        // if directory is created, and watching recursively, then
                        // register it and its sub-directories
                        if (recursive) {
                            try {
                                if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                                    registerAll(child);
                                    continue;
                                }
                            } catch (IOException x) {
                                // ignore to keep sample readbale
                            }
                        }
                        while(true){
                            if(fileProcessingQueue.remainingCapacity() < 2){
                                // if only one last can be inserted then don't queue this we need 1 empty space in queue
                                // for swaping file names..
                                // sleep for some time so processing thread may have made some rooms to queue in fileQueue
                                // this logic will not create any problems as only one this thread is inserting in queue
                                try{
                                    Thread.sleep(200);
                                }catch(InterruptedException ie){
                                }
                                continue;
                            }
                            if(!fileProcessingQueue.offer(child.toString())){
                                // couldn't queue this element by whatever reason.. we will try to enqueue again by continuing loop
                                continue;
                            }else{
                                // file name has been queued in queue
                                break;
                            }
                        }
                    }
                }
                // reset key and remove from set if directory no longer accessible
                boolean valid = key.reset();
                if (!valid) {
                    keys.remove(key);

                    // all directories are inaccessible
                    if (keys.isEmpty()) {
                        break;
                    }
                }
            }catch(ClosedWatchServiceException cwse){
                break;
            }

        }
        closeProcessingThread = true;
        closeWatcherThread = true;
        System.out.println("DirWatcherThread exited.");
    }

    public void stopWatching(){
        try{
            watcher.close();
        }catch(IOException ioe){
        }
        closeProcessingThread = true;
        closeWatcherThread = true;
    }

    public static WatchDir watchDirectory(String dirName, boolean recursive) throws InvalidPathException, IOException, Exception{
        try{
            Path dir = Paths.get(dirName);
            final WatchDir watchDir = new WatchDir(dir, recursive);
            watchDir.closeProcessingThread = false;
            watchDir.closeWatcherThread = false;
            new Thread(new Runnable() {
                public void run() {
                    watchDir.processFiles();
                }
            }, "DirWatchProcessingThread").start();
            new Thread(new Runnable() {
                public void run() {
                    watchDir.processEvents();
                }
            }, "DirWatcherThread").start();
            return watchDir;
        }catch(InvalidPathException ipe){
            throw ipe;
        }catch(IOException ioe){
            throw ioe;
        }catch(Exception e){
            throw e;
        }
    }

    @SuppressWarnings("unchecked")
    private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return (WatchEvent<T>)event;
    }

    /**
     * Register the given directory with the WatchService
     */
    private void register(Path dir) throws IOException {
        //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        WatchKey key = dir.register(watcher, ENTRY_CREATE);
        if (trace) {
            Path prev = keys.get(key);
            if (prev == null) {
                System.out.format("register: %s\n", dir);
            } else {
                if (!dir.equals(prev)) {
                    System.out.format("update: %s -> %s\n", prev, dir);
                }
            }
        }
        keys.put(key, dir);
    }

    /**
     * Register the given directory, and all its sub-directories, with the
     * WatchService.
     */
    private void registerAll(final Path start) throws IOException {
        // register directory and sub-directories
        Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                register(dir);
                return FileVisitResult.CONTINUE;
            }
        });
    }

    /**
     * Creates a WatchService and registers the given directory
     */
    private WatchDir(Path dir, boolean recursive) throws IOException {
        fileProcessingQueue = new ArrayBlockingQueue<String>(WatchDirParameters.fileQueueSize, false);
        processedFileQueue = new ArrayBlockingQueue<String>(WatchDirParameters.fileQueueSize, false);
        this.watcher = FileSystems.getDefault().newWatchService();
        this.keys = new HashMap<WatchKey,Path>();
        this.recursive = recursive;
        //CreateTxtFile.createFile(dir, 1);
        if (recursive) {
            System.out.format("Scanning %s ...\n", dir);
            registerAll(dir);
            System.out.println("Done.");
        } else {
            register(dir);
        }

        // enable trace after initial registration
        this.trace = true;
    }
}

参数类:

package dirwatch;

public class WatchDirParameters {
    public static final int millisToPuaseForFileLock = 200;
    public static final int fileQueueSize = 500;
    public static final int millisToSwapFileForUnlocking = 2000;
}

【讨论】:

    【解决方案2】:

    制作了@UDPLover 提供的文件的更新版本,该版本专为在高速率文件访问环境中使用而构建在文件阻塞检查器本身内部。还制作了一个 print() 方法,允许启用或禁用由 WatchCore 打印到控制台的任何内容。原始示例中的文件轮询 for 循环已更新为使用 JDK8 函数 for 循环,使所有部分线程化/中断。这还没有测试,当我可以测试它时会更新修复。

    package filewatcher;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.PrintStream;
    import java.nio.file.ClosedWatchServiceException;
    import java.nio.file.FileSystems;
    import java.nio.file.FileVisitResult;
    import java.nio.file.Files;
    import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.nio.file.SimpleFileVisitor;
    import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
    import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
    import java.nio.file.WatchEvent;
    import java.nio.file.WatchKey;
    import java.nio.file.WatchService;
    import java.nio.file.attribute.BasicFileAttributes;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     *
     * @author Nackloose
     * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
     */
    public abstract class WatchCore extends Thread {
    
        //make class a thread by default
        /**
         * After the WatchCore recieves an event for a file and deems it unlocked,
         * it will be passed to this function
         *
         * @param e WatchEvent for the file, after it has been affirmed to be
         * unlocked.
         */
        public abstract void onEventAndUnlocked(WatchEvent e);
        private final WatchService watcher;
        private final Map<WatchKey, Path> keys;
        private final boolean recursive;
        private boolean trace = false;
        //converted to HashMap to remove the limitation as I need this in a high rate of file access enviroment.
        //as well as to carry the event passed for that folder into the block check itself.
        //got rid of the finished queue and made events pass to the abstract void above 
        private final HashMap<String, WatchEvent> fileProcessingQueue;
        //create a varible to keep track of the thread checking the file blocking, so we can start and stop it.
        private final WatchBlocker blocker;
    
        public WatchCore(String dir) throws IOException {
            //defaultly  dont recurse
            this(dir, false);
        }
    
        public WatchCore(String dir, boolean recursive) throws IOException {
            this(Paths.get(dir), recursive);
    
        }
    
        public WatchCore(Path dir) throws IOException {
            //defaultly  dont recurse
            this(dir, false);
        }
    
        public WatchCore(Path dir, boolean recursive) throws IOException {
            fileProcessingQueue = new HashMap<>();
            this.watcher = FileSystems.getDefault().newWatchService();
            this.keys = new HashMap<>();
            this.recursive = recursive;
            //CreateTxtFile.createFile(dir, 1);
            if (recursive) {
                print("Scanning %s ...", dir);
                registerAll(dir);
                print("Done.");
            } else {
                register(dir);
            }
            // enable trace after initial registration
            this.trace = true;
            //start the thread to process files to be checked for file blocking
            blocker = new WatchBlocker();
        }
    
        @SuppressWarnings("unchecked")
        private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
            return (WatchEvent<T>) event;
        }
    
        @Override
        public synchronized void start() {
            //start up our thread _FIRST_
            super.start();
            //then start the blocking thread
            blocker.start();
        }
    
        @Override
        public void interrupt() {
            //Everything backwards, stop the blocker _FIRST_
            blocker.interrupt();
            //then stop our thread.
            super.interrupt();
        }
    
        /**
         * Register the given directory with the WatchService
         */
        private void register(Path dir) throws IOException {
            //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
            WatchKey key = dir.register(watcher, ENTRY_CREATE);
            if (trace) {
                Path prev = keys.get(key);
                if (prev == null) {
                    print("register: %s\n", dir);
                } else {
                    if (!dir.equals(prev)) {
                        print("update: %s -> %s\n", prev, dir);
                    }
                }
            }
            keys.put(key, dir);
        }
    
        /**
         * Register the given directory, and all its sub-directories, with the
         * WatchService.
         */
        private void registerAll(final Path start) throws IOException {
            // register directory and sub-directories
            Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                    register(dir);
                    return FileVisitResult.CONTINUE;
                }
            });
        }
    
        /**
         * Process all events for keys queued to the watcher
         */
        @Override
        public void run() {
            //this was previous called processEvents()
            //pruned any un-nessicary continues, labels, and labels on breaks, a lot of them
            //were redundant
            print("DirWatcherThread started.");
            //as long as we're not interrupted we keep working
            while (!interrupted()) {
                // wait for key to be signalled
                WatchKey key;
                try {
                    key = watcher.take();
                } catch (InterruptedException x) {
                    // if we are returning from these method, it means we no longer wants to watch directory
                    // we must close thread which may be waiting for file names in queue
                    continue;
                } catch (ClosedWatchServiceException cwse) {
                    break;
                }
    
                Path dir = keys.get(key);
                if (dir == null) {
                    printe("WatchKey not recognized!!");
                    continue;
                }
    
                try {
                    //converted to functional for loop.
                    key.pollEvents().stream().filter((event) -> {
                        WatchEvent.Kind kind = event.kind();
                        return !(kind == OVERFLOW); //make sure we do the filter
                    }).forEach((event) -> {
                        WatchEvent.Kind kind = event.kind();
                        // Context for directory entry event is the file name of entry
                        WatchEvent<Path> ev = cast(event);
                        Path name = ev.context();
                        Path child = dir.resolve(name);
                        if (kind.equals(ENTRY_CREATE)) {
                            // if directory is created, and watching recursively, then
                            // register it and its sub-directories
                            if (recursive) {
                                try {
                                    if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                                        registerAll(child);
                                        return; //continue;
                                    }
                                } catch (IOException x) {
                                    // ignore to keep sample readbale
                                }
                            }
                            fileProcessingQueue.put(child.toString(), ev);
                        }
                    });
                    // reset key and remove from set if directory no longer accessible
                    boolean valid = key.reset();
                    if (!valid) {
                        keys.remove(key);
    
                        // all directories are inaccessible
                        if (keys.isEmpty()) {
                            break;
                        }
                    }
                } catch (ClosedWatchServiceException cwse) {
                    break;
                }
    
            }
            print("DirWatcherThread exited.");
        }
    
        /**
         *
         * @author
         * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
         * Nackloose
         */
        private class WatchBlocker extends Thread {
    
            @Override
            public synchronized void start() {
                //get it going
                super.start();
            }
    
            @Override
            public void interrupt() {
                //interupt our thread
                super.interrupt();
            }
    
            @Override
            public void run() {
                //this was perviously processFiles()
                //pruned any un-nessicary continues, labels, and labels on breaks, a lot of them
                //were redundant
                print("DirWatchProcessingThread Started");
                Entry<String, WatchEvent> fileEvent;
                outerLoop:
                //as long as we're not interrupted we keep working
                while (!interrupted()) {
                    if (fileProcessingQueue.isEmpty()) {
                        try {
                            Thread.sleep(WatchCoreParameters.timeToIdle);
                        } catch (InterruptedException ex) {
                            Logger.getLogger(WatchCore.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        continue;
                    }
                    fileEvent = fileProcessingQueue.entrySet().iterator().next();
                    fileProcessingQueue.remove(fileEvent.getKey());
    
                    long startTime = System.currentTimeMillis();
                    while (true) {
                        FileInputStream fis = null;
                        File file = new File(fileEvent.getKey());
                        try {
                            fis = new FileInputStream(fileEvent.getKey());
                            break;
                        } catch (FileNotFoundException fnfe) {
                            if (!file.exists() || file.isDirectory()) {
                                print("File: '" + fileEvent + "has been deleted in file system or it is not file. Not processing this file.");
                                continue outerLoop;
                            }
                            try {
                                Thread.sleep(WatchCoreParameters.millisToPauseForFileLock);
                            } catch (InterruptedException ie) {
                            }
                            if ((System.currentTimeMillis() - startTime) > WatchCoreParameters.millisToSwapFileForUnlocking) {
                                fileProcessingQueue.put(fileEvent.getKey(), fileEvent.getValue());
                            }
                        } finally {
                            if (fis != null) {
                                try {
                                    fis.close();
                                } catch (IOException ioe) {
                                    ioe.printStackTrace();
                                }
                            }
                        }
                    }
                    print("Queuing File: " + fileEvent);
                    //pass the unlocked file event to the abstract method
                    onEventAndUnlocked(fileEvent.getValue());
                }
                print("DirWatchProcessingThread Exited");
            }
        }
    
        /**
         *
         * @author
         * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
         * Nackloose
         */
        public static class WatchCoreParameters {
    
            public static int timeToIdle = 2000, millisToPauseForFileLock = 200,
                    millisToSwapFileForUnlocking = 2000;
            public static boolean verbose = false;
    
        }
    
        //<editor-fold defaultstate="collapsed" desc="Printing methods">
        private void print(String s) {
            //defaultly we're not writing an error
            print(s, false);
        }
    
        public static final void print(String s, boolean error) {
            //check verbosity, exit if none.
            if (!WatchCoreParameters.verbose) {
                return;
            }
            //if this is an error, assign System.err to a temp varible
            //otherise assign System.out for normal printing
            PrintStream out = (!error ? System.out : System.err);
            if (s.contains("\n")) { // check to see if theirs a new line
                out.print(s); //print accordingly
            } else {
                out.println(s); //print accordingly
            }
        }
    
        public static final void printe(String s) {
            //shortcut/convenience method for printing an error
            print(s, true);
        }
    
        public static final void print(String s, Object... formatObj) {
            //check verbosity, exit if none.
            if (!WatchCoreParameters.verbose) {
                return;
            }
            //format the object into the string, and if no newline is there, add it.
            System.out.format(s + (s.contains("\n") ? "" : "\n"), formatObj);
        }
    //</editor-fold>
    }
    

    【讨论】:

      猜你喜欢
      • 2011-11-23
      • 1970-01-01
      • 1970-01-01
      • 2010-10-05
      • 1970-01-01
      • 2023-03-08
      • 1970-01-01
      • 1970-01-01
      • 2011-12-13
      相关资源
      最近更新 更多