【问题标题】:java.io.BufferedReader().map Cannot infer type argument(s) for <T> fromStream(Stream<? extends T>)java.io.BufferedReader().map 无法推断 <T> fromStream(Stream<? extends T>) 的类型参数
【发布时间】:2020-07-05 11:48:39
【问题描述】:

场景:Spring WebFlux 触发 CommandLineRunner.run 以将数据加载到 MongoDb 以进行测试。

目标:在本地启动微服务时,旨在读取 json 文件并将文档加载到 MongDb。

个人知识:“bufferedReader.lines().filter(l -> !l.trim().isEmpty()”读取每个json节点并将其作为流返回。然后我可以将其映射到“l”并访问get 方法。我想我不必创建一个列表然后流式传输它,因为我已经通过“new InputStreamReader(getClass().getClassLoader().getResourceAsStream()”将它加载为流,我假设我可以使用lines() 因为它节点会产生一个字符串线。我是在正确的方向还是我搞砸了一些想法?

这是一个 json 示例文件:

{
  "Extrato": {
    "description": "credit",
    "value": "R$1.000,00",
    "status": 11
  },
  "Extrato": {
    "description": "debit",  
    "value": "R$2.000,00",
    "status": 99
  }
}

型号

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class Extrato {

    @Id
    private String id;
    private String description;
    private String value;
    private Integer status;

    public Extrato(String id, String description, String value, Integer status) {
        super();
        this.id = id;
        this.description = description;
        this.value = value;
        this.status = status;
    }
... getters and setter accordinly

存储库

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

import com.noblockingcase.demo.model.Extrato;

import reactor.core.publisher.Flux;
import org.springframework.data.domain.Pageable;

public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
    @Query("{ id: { $exists: true }}")
    Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}

从上面的 json 文件加载的命令

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import com.noblockingcase.demo.model.Extrato;
import com.noblockingcase.demo.repository.ExtratoRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

@Component
public class TestDataLoader implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(TestDataLoader.class);
    private ExtratoRepository extratoRepository;

    TestDataLoader(final ExtratoRepository extratoRepository) {
        this.extratoRepository = extratoRepository;
    }

    @Override
    public void run(final String... args) throws Exception {
        if (extratoRepository.count().block() == 0L) {
            final LongSupplier longSupplier = new LongSupplier() {
                Long l = 0L;

                @Override
                public long getAsLong() {
                    return l++;
                }
            };
            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));

//*** THE ISSUE IS NEXT LINE
            Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
                    .map(l -> extratoRepository.save(new Extrato(String.valueOf(longSupplier.getAsLong()),
                            l.getDescription(), l.getValue(), l.getStatus()))))
                    .subscribe(m -> log.info("Carga Teste: {}", m.block()));

        }
    }

}

这是 MongoDb 配置,但我认为它不相关

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.mongodb.MongoClientOptions;

@Configuration
public class MongoDbSettings {

    @Bean
    public MongoClientOptions mongoOptions() {
        return MongoClientOptions.builder().socketTimeout(2000).build();
    }

}

如果我尝试使用我的原始代码并对其进行调整以读取文本文件,我可以成功读取文本文件而不是 json。 Obvisouly 它不符合我的需求,因为我想读取 json 文件。顺便说一句,它可以更清楚地说明我被阻止的地方。

load-test.txt(在https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/demo/src/main/resources/carga-teste.txt 中可用)

crédito de R$1.000,00
débito de R$100,00

sn-p 代码处理简单的文本文件

    BufferedReader bufferedReader = new BufferedReader(
            new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
    Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
            .map(l -> extratoRepository
                    .save(new Extrato(String.valueOf(longSupplier.getAsLong()), "Qualquer descrição", l))))
            .subscribe(m -> log.info("Carga Teste: {}", m.block()));

整个项目成功地从文本文件中读取:https://github.com/jimisdrpc/webflux-worth-scenarious/tree/master/demo

Docker compose 用于启动 MongoDb https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/docker-compose.yml

总而言之,我的问题是:我没有弄清楚如何在 CommandLineRunner.run() 期间读取 json 文件并将数据插入到 MongoDb 中

【问题讨论】:

    标签: mongodb spring-data java-stream spring-webflux java-io


    【解决方案1】:

    请注意您的 json 无效。文本数据与 json 不同。 Json 需要特殊处理,所以最好使用库。

    carga-teste.json

    [
      {"description": "credit", "value": "R$1.000,00", "status": 11},
      {"description": "debit","value": "R$2.000,00", "status": 99}
    ]
    

    致谢这里的文章 - https://www.nurkiewicz.com/2017/09/streaming-large-json-file-with-jackson.html

    我已经习惯使用 Flux。

    @Override
    public void run(final String... args) throws Exception {
    
            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.json")));
    
            ObjectMapper mapper = new ObjectMapper();
    
            Flux<Extrato> flux = Flux.generate(
                    () -> parser(bufferedReader, mapper),
                    this::pullOrComplete,
                    jsonParser -> {
                        try {
                            jsonParser.close();
                        } catch (IOException e) {}
                    });
    
            flux.map(l -> extratoRepository.save(l)).subscribe(m -> log.info("Carga Teste: {}", m.block()));
        }
    }
    
    private JsonParser parser(Reader reader, ObjectMapper mapper) {
        JsonParser parser = null;
        try {
            parser = mapper.getFactory().createParser(reader);
            parser.nextToken();
        } catch (IOException e) {}
        return parser;
    }
    
    private JsonParser pullOrComplete(JsonParser parser, SynchronousSink<Extrato> emitter) {
        try {
            if (parser.nextToken() != JsonToken.END_ARRAY) {
                Extrato extrato = parser.readValueAs(Extrato.class);
                emitter.next(extrato);
            } else {
                emitter.complete();
            }
        } catch (IOException e) {
            emitter.error(e);
        }
        return parser;
    }
    

    【讨论】:

    • 我强烈建议未来的读者阅读上面发布的文章。我和杰克逊一起工作了几年,我以前不知道:“......杰克逊提供类似于 StAX 的流模式......为了避免将所有内容加载到内存中,我们必须使用 ObjectMapper 使用的较低级别的 API...... .它与RxJava有什么关系?我们可以按需读取这个JSON文件,逐块读取。这使得背压机制能够无缝工作“显然我不是为了从文件中读取数据而使用webflux编程,而是为了DEVOP 的目的是使用文件中的测试数据很好地启动本地 MongoDb
    【解决方案2】:

    我找到了一个 Flux::using Flux::fromStream 的例子,对这个目的很有帮助。这会将您的文件读入Flux,然后您可以使用.flatmap 或其他方式订阅和处理。来自 Javadoc

    using(Callable resourceSupplier, Function> sourceSupplier, Consumer resourceCleanup) 使用由供应商为每个订阅者生成的资源,同时从同一资源派生的发布者流式传输值,并确保在序列终止或订阅者取消时释放资源。

    和我放在一起的代码:

    private static Flux<Account> fluxAccounts() {
        return Flux.using(() -> 
            new BufferedReader(new InputStreamReader(new ClassPathResource("data/ExportCSV.csv").getInputStream()))
                .lines()
                .map(s->{
                    String[] sa = s.split(" ");
                    return Account.builder()
                        .firstname(sa[0])
                        .lastname(sa[1])
                        .build();
                }),
                Flux::fromStream,
                BaseStream::close
        );
    }
    

    【讨论】:

    • 谢谢,除非我没有明白你的意思,否则你的建议与我正在尝试的并没有太大的不同。看来我不能使用 .lines() 从 json 读取。如果它是 json 而不是 csv,你知道我必须改变你的答案吗?
    • 为什么不能使用 .lines() 读取 json?如果数据库的每个文档都在文件中的一行中,那么您到底遇到了什么麻烦?简而言之,为.map 映射中的每一行做任何你想做的映射。
    • 我在您的建议中添加的异常以及如何成功读取一个简单的文本文件(显然它不符合我的要求,因为源是 json)
    • 显然你完全迷失在这里了。我建议先退一步了解 Java 中的 JSON 处理。
    • 这个解决方案对我帮助很大:)
    猜你喜欢
    • 1970-01-01
    • 2017-07-06
    • 1970-01-01
    • 1970-01-01
    • 2020-08-22
    • 2014-12-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多