【问题标题】:Handling ListenableFuture in Project reactor处理 Project reactor 中的 ListenableFuture
【发布时间】:2017-07-29 12:35:50
【问题描述】:

我已经开始使用项目反应器,并希望将我们的一个 API 移动到反应式做事方式。我想知道如何处理诸如 ListenableFuture 之类的事情。

就我而言,我使用的是 Cassandra,当我调用 session.executeAsync() 时,它会返回一个扩展 ListenableFuture 的 ResultSetFuture。 下面是我现在编写的代码示例,我似乎对向客户公开 ListenableFuture 并不满意。

public Mono<ListenableFuture<Void>> save(Publisher<AccountDTO> accountPublisher) {
    return Mono.just(accountPublisher)
            .map(accountDTO -> {
                Account accountEntity = modelMapper.map(accountDTO, Account.class);
                return mappingManager.mapper(Account.class).saveAsync(accountEntity);
            })
            .retry(1)
            .doOnError(throwable -> log.error("Unable to create account "))
            .mapError(throwable -> new MyCustomException(""));
}

我的问题是:

公开 ListenableFuture 是不是一个好习惯,我个人不想把这样的东西还给他们可以阻止的客户端。有没有更好的方法在我可以返回 Mono 的项目反应器中处理这个问题?

【问题讨论】:

    标签: project-reactor


    【解决方案1】:

    您可以使用 Mono.create() 工厂方法轻松桥接 ListenableFuture&lt;Void&gt; 异步 API 以公开 Mono&lt;Void&gt;。该方法采用 Consumer&lt;Sink&gt;,您将其作为 lambda 提供:

    1. 为调用sink.success()的future添加一个成功监听器(因为没有实际值,或者你也可以用监听器接收到的Void值调用success(aVoid)
    2. 为调用sink.error(failure)的future添加一个失败监听器

    差不多了!请参阅create 上的参考文档(尽管此文档提到了Flux 版本,由于必须处理多个值,所以它有点复杂):http://projectreactor.io/docs/core/release/reference/docs/index.html#producing.create

    【讨论】:

    • 感谢@Simon 指导我。这有帮助。
    【解决方案2】:

    发布我按照上面@Simon 的指导编写的代码 sn-p。

    @Override
    public Mono<Void> save(AccountDTO accountDTO) {
        return Mono.create(voidMonoSink -> {
    
            Account account = converter.map(accountDTO, Account.class);
    
            ListenableFuture<Void> voidListenableFuture = mappingManager.mapper(Account.class).saveAsync(account);
    
            Futures.addCallback(voidListenableFuture, new FutureCallback<Void>() {
    
                @Override
                public void onSuccess(Void result) {
                    voidMonoSink.success(result);
                }
    
                @Override
                public void onFailure(Throwable t) {
                    log.error("Unable to save account " + accountDTO, t);
                    voidMonoSink.error(new MyCustomException());
                }
            });
        });
    }
    

    【讨论】:

      【解决方案3】:

      在使用 lambdas 的 Java 8 中,如果您正在寻找 Simon 使用 ListenableFutures 创建 Flux 的答案的实现,

      Flux.create(fluxSink -> {
                  future.addCallback(
                          result -> {
                              fluxSink.next(result);
                              fluxSink.complete();
                          },
                          ex -> fluxSink.error(ex));
              });
      

      【讨论】:

        【解决方案4】:

        由于这篇文章已经很老了,两个库的 API 已经成熟,产生了一个更简单的解决方案。

        可以简单地将default CompletableFuture completable()public static Mono fromFuture(CompletableFuture<? extends T> future) 结合使用。

        这是一个改编自Spring for Apache Kafka的例子

        public void sendToKafka(final MyOutputData data) {
          final ProducerRecord<String, String> record = createRecord(data);
        
          CompletableFuture<SendResult<Integer, String>> future = template.send(record).completable();
          
          Mono<SendResult<Integer, String>> mono = Mono.fromFuture(future)
            .doOnSuccess((result) ->  handleSuccess(data))
            .doOnError((ex) -> handleFailure(data, record, ex))
            .doFinally((signalType) -> { if(signalType == SignalType.CANCEL) future.cancel(true); });
          
          mono.subscribe();
        }
        

        Note from Reactor JavaDocs

        请注意,当 Mono 被取消时,future 不会被取消,但可以通过使用 doFinally(Consumer) 检查 SignalType.CANCEL 并调用 CompletableFuture.cancel(boolean) 来获得该行为。

        【讨论】:

        • 虽然我不知道这个问题中提到的依赖关系,但就我而言,com.google.common.util.concurrent.ListenableFuture 没有提供任何将 ListenableFuture 转换为 CompletableFuture 的方法(我无法更改,因为它来自第 3 方库)。所以可能只有@Nishant 和@Bharath 的解决方案可以工作。
        猜你喜欢
        • 2017-05-18
        • 1970-01-01
        • 2020-10-31
        • 2019-12-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-06-12
        相关资源
        最近更新 更多