【问题标题】:Is there a way to prevent ClosedByInterruptException?有没有办法防止 ClosedByInterruptException?
【发布时间】:2019-02-15 01:59:06
【问题描述】:

在以下示例中,我有一个文件被两个线程使用(在实际示例中,我可以有任意数量的线程)

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class A {
    static volatile boolean running = true;

    public static void main(String[] args) throws IOException, InterruptedException {
        String name = "delete.me";
        new File(name).deleteOnExit();
        RandomAccessFile raf = new RandomAccessFile(name, "rw");
        FileChannel fc = raf.getChannel();

        Thread monitor = new Thread(() -> {
            try {
                while (running) {
                    System.out.println(name + " is " + (fc.size() >> 10) + " KB");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Monitor thread died");
                e.printStackTrace();
            }
        });
        monitor.setDaemon(true);
        monitor.start();

        Thread writer = new Thread(() -> {
            ByteBuffer bb = ByteBuffer.allocateDirect(32);
            try {
                while (running) {
                    bb.position(0).limit(32);
                    fc.write(bb);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Writer thread died");
                e.printStackTrace();
            }
        });

        writer.setDaemon(true);
        writer.start();

        Thread.sleep(5000);
        monitor.interrupt();
        Thread.sleep(2000);
        running = false;
        raf.close();
    }
}

而不是为每个线程创建一个 RandomAccessFile 和一个内存映射,我在线程之间共享一个文件和一个内存映射,但是有一个问题,如果任何线程被中断,资源就会关闭。

delete.me is 0 KB
delete.me is 2 KB
delete.me is 4 KB
delete.me is 6 KB
delete.me is 8 KB
Interrupted
Monitor thread died
java.nio.channels.ClosedByInterruptException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
    at A.lambda$main$0(A.java:19)
    at java.lang.Thread.run(Thread.java:748)
Writer thread died
java.nio.channels.ClosedChannelException
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
    at A.lambda$main$1(A.java:41)
    at java.lang.Thread.run(Thread.java:748)

有没有什么办法可以防止 FileChannel 仅仅因为一个使用它的线程被中断而被关闭?


编辑我想避免做的是因为我怀疑它不适用于 Java 9+

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class
                .getDeclaredField("interruptor");
        field.setAccessible(true);
        field.set(fc, (Interruptible) thread
                -> Jvm.warn().on(getClass(), fc + " not closed on interrupt"));
    } catch (Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

顺便说一句,对fc.size() 的调用返回了上述黑客所期望的大小。

【问题讨论】:

  • 我虽然:这听起来很有趣但很容易回答。然后我看到了赞成票,你问了这个问题。有些东西告诉我,我什至不必尝试在这里找到答案。现在让真正的大师级别的家伙进来......
  • 我能想到的唯一可能的方法是您刚刚在编辑中排除的内容。
  • nio-dev 上已经有很多关于为此引入新的 OpenOption 的讨论,但没有结论。同时,没有任何支持的方式来获取不可中断的 FileChannel。破解中断器字段并不可靠,取决于 JDK 内部,并且随时可能中断。
  • @PeterLawrey 内存映射完全不受文件通道关闭的影响。事实上,我通常会尽早关闭频道,以尽量减少资源使用。例如。 MappedByteBuffer mapped; try(FileChannel fc = FileChannel.open(name, READ, WRITE, CREATE_NEW, DELETE_ON_CLOSE)) { mapped = fc.map(FileChannel.MapMode.READ_WRITE, position, size); } /* use of mapped */ 请注意,映射缓冲区会阻止 DELETE_ON_CLOSE 在关闭时立即删除,这会将其变成“退出时删除”行为,我发现这比 File.deleteOnExit() 更可靠……
  • 就像@Holger 所说,即使关闭了创建它的 FileChannel,MappedByteBuffer 仍然有效。事实上,在垃圾收集之前它是有效的。我建议您,内存映射不太适合增长的文件,并且在写入超出边界时会在某些操作系统上给出奇怪的结果。如果您打算将文件的给定部分分配给每个线程,这可能是一个不错的选择,但如果所有线程都需要按顺序写入,您将需要分配足够的字节并检查是否达到边界以创建新文件(或扩展当前文件并映射新创建的部分)。

标签: java multithreading java-9 filechannel


【解决方案1】:

既然你说要“线程之间共享一个内存映射”,根本不存在这样的问题,因为内存映射不受FileChannel的关闭影响。事实上,尽快关闭通道是一个很好的策略,可以减少应用程序占用的资源。

例如

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    MappedByteBuffer mapped;
    try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) {
        mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
    }
    Thread thread1 = new Thread(() -> {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            byte[] b = new byte[5];
            mapped.position(4000);
            mapped.get(b);
            System.out.println("read "+new String(b, StandardCharsets.US_ASCII));
        }
    });
    thread1.setDaemon(true);
    thread1.start();
    Thread thread2 = new Thread(() -> {
        byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            mapped.position(4000);
            mapped.put(b);
            System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII));
            byte b1 = b[0];
            System.arraycopy(b, 1, b, 0, b.length-1);
            b[b.length-1] = b1;
        }
        mapped.force();
    });
    thread2.setDaemon(true);
    thread2.start();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    thread2.interrupt();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    running = false;

这演示了线程如何在通道关闭后读取和写入数据,并且中断写入线程不会停止读取线程。

如果除了内存映射 I/O 之外还需要执行FileChannel 操作,使用多个FileChannel 实例是没有问题的,因此关闭一个通道不会影响另一个通道。例如

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE);
        FileChannel fc2 = FileChannel.open(name,READ,WRITE)) {
        Thread thread1 = new Thread(() -> {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
            try {
                MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                while(running && !Thread.interrupted()) {
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                    byte[] b = new byte[5];
                    mapped.position(4000);
                    mapped.get(b);
                    System.out.println("read from map "
                        +new String(b, StandardCharsets.US_ASCII)
                        +", file size "+fc1.size());
                }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread1.setDaemon(true);
        thread1.start();
        Thread thread2 = new Thread(() -> {
            byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
            try {
                MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                fc2.position(4096);
                try {
                    while(running && !Thread.interrupted()) {
                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                        mapped.position(4000);
                        mapped.put(b);
                        System.out.println("wrote to mapped "
                            +new String(b, StandardCharsets.US_ASCII));
                        byte b1 = b[0];
                        System.arraycopy(b, 1, b, 0, b.length-1);
                        b[b.length-1] = b1;
                        fc2.write(ByteBuffer.wrap(b));
                    }
                } finally { mapped.force(); }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread2.setDaemon(true);
        thread2.start();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        thread2.interrupt();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
        running = false;
    }
}

这里,一个线程的中断确实会关闭它的通道,但不会影响另一个线程。此外,即使每个线程从其自己的通道获取自己的MappedByteBuffer,更改也会显示到另一个,即使没有使用force()。当然,后者被定义为依赖于系统的行为,并不保证适用于每个系统。

但如第一个示例所示,您仍然可以在开始时仅从一个通道创建共享缓冲区,同时在不同的通道上执行 I/O 操作,每个线程一个,不管是否以及哪些通道被关闭,映射的缓冲区不受它的影响。

【讨论】:

    【解决方案2】:

    您可以使用反射访问interruptor 字段illegaly 并从那里获取sun.nio.ch.Interruptible 类类型以创建代理实例:

    private void doNotCloseOnInterrupt(FileChannel fc) {
        try {
            Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor");
            Class<?> interruptibleClass = field.getType();
            field.setAccessible(true);
            field.set(fc, Proxy.newProxyInstance(
                    interruptibleClass.getClassLoader(), 
                    new Class[] { interruptibleClass },
                    new InterruptibleInvocationHandler()));
        } catch (final Exception e) {
            Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
        }
    }
    
    public class InterruptibleInvocationHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
        {
            // TODO: Check method and handle accordingly
            return null;
        }
    }
    

    在 Java9 中,这仅适用于单个警告,因为它默认使用 --illegal-access=permit 运行。

    但是,在未来的版本中可能会删除此标志,确保其长期有效的最佳方法是使用标志 --add-opens

    --add-opens java.base/sun.nio.ch=your-module
    --add-opens java.base/java.nio.channels.spi=your-module
    

    或者,如果您不使用模块(不推荐):

    --add-opens java.base/sun.nio.ch=ALL-UNNAMED
    --add-opens java.base/java.nio.channels.spi=ALL-UNNAMED
    

    这适用于 Java 9、Java 10 和当前的 JDK 11 Early-Access Build (28 (2018/8/23))。

    【讨论】:

    • @Eugene 我宁愿相信official docs 而不是对 SO 的评论:“在准备拒绝非法访问的 JDK 版本时,应使用 --illegal-access= 测试应用程序和库拒绝。” 我发布的代码适用于--illegal-access=deny--add-opens。我不知道在可预见的将来不会支持--add-exports--add-opens 的任何官方信息。
    • 你是对的,当然;当然,除非该评论来自实际编写拼图的人:)
    • 嗯,公平点,没看到。好吧,如果将来真的会有一个无法访问 JDK 内部的 Java 版本,那么 OP 所要求的就是,嗯,不可能,除非自己重新实现所需的类。但在那之前,这个解决方案是有效的。
    • 奇怪的是为什么它首先委派了这个功能。似乎拥有此字段的唯一目的是可以更改它。
    【解决方案3】:

    通过使用 AsynchronousFileChannel,永远不会抛出 ClosedByInterruptException 它似乎并不关心中断

    使用 jdk 1.8.0_72 完成测试

    import java.io.File;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousFileChannel;
    import java.nio.channels.CompletionHandler;
    import java.nio.file.Path;
    import java.nio.file.StandardOpenOption;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class A {
        static volatile boolean running = true;
    
        public static void main(String[] args) throws IOException, InterruptedException {
            String name = "delete.me";
            Path path = new File(name).toPath();
            AtomicLong position = new AtomicLong(0);
    
            AsynchronousFileChannel fc = AsynchronousFileChannel.open(path, 
                    StandardOpenOption.CREATE_NEW, StandardOpenOption.DELETE_ON_CLOSE ,
                    StandardOpenOption.READ, StandardOpenOption.WRITE,
                    StandardOpenOption.WRITE, StandardOpenOption.SYNC);
    
            CompletionHandler<Integer, Object> handler =
                    new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        //System.out.println(attachment + " completed with " + result + " bytes written");
                        position.getAndAdd(result);
                    }
                    @Override
                    public void failed(Throwable e, Object attachment) {
                        System.err.println(attachment + " failed with:");
                        e.printStackTrace();
                    }
                };
    
            Runnable monitorRun = () -> {
                try {
                    while (running) {
                        System.out.println(name + " is " + (fc.size() >> 10) + " KB");
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            System.out.println("Interrupted");
                            Thread.currentThread().interrupt();
                            System.out.println("Interrupt call failed so return");
                            return;
                        }
                    }
                } catch (IOException e) {
                    System.err.println("Monitor thread died");
                    e.printStackTrace();
                }
            };
    
            Thread monitor = new Thread(monitorRun);
            monitor.setDaemon(true);
            monitor.start();
    
            Thread writer = new Thread(() -> {
                ByteBuffer bb = ByteBuffer.allocateDirect(32);
                try {
                    while (running) {
                        bb.position(0).limit(32);
                        fc.write(bb,position.get(),null,handler);
    
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            System.out.println("Interrupted");
                            Thread.currentThread().interrupt();
                        }
                    }
                } catch (Exception e) {
                    System.err.println("Writer thread died");
                    e.printStackTrace();
                }
            });
    
            writer.setDaemon(true);
            writer.start();
    
            Thread.sleep(5000);
            monitor.interrupt();
            Thread.sleep(2000);
            monitor = new Thread(monitorRun);
            monitor.start();
            Thread.sleep(5000);
            running = false;
            fc.close();
        }
    }
    

    生成以下输出:

    delete.me is 0 KB
    delete.me is 3 KB
    delete.me is 6 KB
    delete.me is 9 KB
    delete.me is 12 KB
    Interrupted
    Interrupt call failed so return
    delete.me is 21 KB
    delete.me is 24 KB
    delete.me is 27 KB
    delete.me is 30 KB
    delete.me is 33 KB
    

    【讨论】:

    • 我需要这个来处理内存映射。我们是否也可以避免使用 RandomAccessFile(两个文件句柄都可以)
    • 对不起,AsynchronousFileChannel 不像 FileChannel 那样提供任何映射功能。该解决方案已经不使用任何 RandomAccessFile。您的要求似乎超出了此问题的范围,可能需要有关您要实现的目标的更多详细信息,请提供指向新问题的链接,我很乐意参与。
    • 仅供参考:从 FileChannel.map 创建 mappedByteBuffer 并使用它来写入并获取已写入内容的大小也解决了 ClosedByInterruptException 问题
    • 这很有帮助。我还需要内存映射,但我怀疑所有可能出现此问题的操作都可以迁移。
    猜你喜欢
    • 2020-10-10
    • 1970-01-01
    • 1970-01-01
    • 2021-12-03
    • 2017-01-17
    • 2019-05-27
    • 2016-12-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多