【问题标题】:Non-blocking reading from the file using Reactor使用 Reactor 非阻塞读取文件
【发布时间】:2022-02-07 02:56:08
【问题描述】:

你知道是否有可能以真正非阻塞的方式创建文件行的 Flux 吗?

我找到的最佳解决方案是下一个:

Flux.using(
           () -> Files.lines(PATH),
           Flux::fromStream,
           Stream::close
 );

尽管它看起来没有阻塞,但实际上它是阻塞的。

https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#lines-java.nio.file.Path-java.nio.charset.Charset-

从文件中读取所有行作为流。与 readAllLines 不同,这 方法不会将所有行读入列表,而是填充 在流被消耗时懒惰地。

返回的流封装了一个Reader。

是否可以从 AsynchronousFileChannel 创建 Flux?

提前谢谢你

【问题讨论】:

  • 你认为它为什么会阻塞?甚至这篇文章也描述了这种技术:baeldung.com/java-nio2-async-file-channel.
  • 嘿,阿尔乔姆。我不认为 AsynchronousFileChannel 是阻塞的,我在上面的代码示例中讲述了 Files.lines(PATH)
  • 为什么会被屏蔽?
  • 您能否提供一个最小完整示例来说明问题?特别是,publishOn()subscribeOn() 运算符使用哪些调度程序?
  • 我认为这里不应该讨论调度程序。我首先想看看这些行的流被阻止的所有论点。

标签: java nio project-reactor nonblocking nio2


【解决方案1】:

如果类路径上有 Spring Framework,则可以执行以下操作:

import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;

import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

public class AsyncFileRead {
    public static void main(String[] args) {
        StringDecoder stringDecoder = StringDecoder.textPlainOnly();

        DataBufferUtils.readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(Path.of("test/sample.txt"),
                StandardOpenOption.READ), DefaultDataBufferFactory.sharedInstance, 4096)
            .transform(dataBufferFlux -> stringDecoder.decode(dataBufferFlux, null, null, null))
            .blockLast();
    }
}

或者,您可以使用 RxIo 库,它提供了类似于 java.nio.file.Files 的漂亮抽象,只是带有异步/反应式支持:

import org.javaync.io.AsyncFiles;
import reactor.core.publisher.Flux;

import java.nio.file.Path;

public class AsyncFileRead {
    public static void main(String[] args) {
        Flux.from(AsyncFiles.lines(Path.of("test/sample.txt")))
            .blockLast();
    }
}

尽管如此,重要的是要注意,即使这些解决方案也不是真正的非阻塞,因为它依赖于平台(Windows、Linux)AsynchronousFileChannel implementation can be blocking,但至少它将该任务委托给专用线程池。

【讨论】:

    猜你喜欢
    • 2017-02-18
    • 1970-01-01
    • 1970-01-01
    • 2020-10-28
    • 1970-01-01
    • 1970-01-01
    • 2016-07-09
    • 2014-11-17
    相关资源
    最近更新 更多