【问题标题】:Vertx Future does not waitVertx Future 不等待
【发布时间】:2015-12-05 10:08:19
【问题描述】:

由于我在我的堆栈中使用 Vertx 3.1,我正在考虑使用这些工具带来的 Future 功能,但在阅读之后,API 似乎对我来说非常有限。我什至找不到让未来等待 Observable 的方法。 这是我的代码

          public Observable<CommitToOrderCommand> validateProductRestrictions(CommitToOrderCommand cmd) {
    Future<Observable<CommitToOrderCommand>> future = Future.future();
    orderRepository.getOrder(cmd, cmd.orderId)
                   .flatMap(order -> validateOrderProducts(cmd, order))
                   .subscribe(map -> checkMapValues(map, future, cmd));
     Observable<CommitToOrderCommand> result = future.result();
    if(errorFound){
        throw MAX_QUANTITY_PRODUCT_EXCEED.create("Fail"/*restrictions.getBulkBuyLimit().getDescription())*/);
    }
    return result;
}

private void checkMapValues(Multimap<String, BigDecimal> totalUnitByRestrictions, Future<Observable<CommitToOrderCommand>> future,
                            CommitToOrderCommand cmd) {
    for (String restrictionName : totalUnitByRestrictions.keySet()) {
        Restrictions restrictions = Restrictions.valueOf(restrictionName);
        if (totalUnitByRestrictions.get(restrictionName)
                                   .stream()
                                   .reduce(BigDecimal.ZERO, BigDecimal::add)
                                   .compareTo(restrictions.getBulkBuyLimit()
                                                          .getMaxQuantity()) == 1) {
            errorFound = true;
        }
    }
    future.complete(Observable.just(cmd));
}

在我的第一个 Observable 的 onComplete 中,我正在检查结果,完成之后是我完成未来以解除阻塞操作的时间。 但我期待 future.result 不会阻塞,直到 future.complete 像我预期的那样被调用。相反只是返回 null。

知道这里出了什么问题吗?

问候。

【问题讨论】:

    标签: java reactive-programming rx-java vert.x vertx3


    【解决方案1】:

    vertx future 不会阻塞,而是使用在注入结果时调用的处理程序(请参阅setHandlerisComplete)。

    如果外层代码需要 Observable,则不需要将其包装在 Future 中,只需返回 Observable&lt;T&gt;Future&lt;Observable&lt;T&gt;&gt; 没有多大意义,你混合了两种异步结果的方式。

    请注意,有多种方法可以将 Observable 折叠成 Future,但困难在于 Observable 可能会发出多个项目,而 Future 只能容纳一个项目。您已经通过将结果收集到一张地图中来解决这个问题。

    由于这个Observable 只发出一个项目,如果你想要一个Future,你应该subscribe 给它,并在onNext 方法中调用future.complete(yourMap)。还要定义一个onError 处理程序,它将调用future.fail

    【讨论】:

      猜你喜欢
      • 2020-08-04
      • 1970-01-01
      • 2019-03-02
      • 1970-01-01
      • 2019-04-14
      • 2013-04-21
      • 2020-09-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多