【问题标题】:Merging several Flux into one from Mongo Collection将 Mongo Collection 中的几个 Flux 合并为一个
【发布时间】:2017-11-06 12:31:03
【问题描述】:

我对 Spring 反应式编程有点陌生。我目前正在使用 spring-boot-starter-webflux:2.0.0.M5 开发一个应用程序。

我有一个包含菜单的 Foodtruck mongodb 模型:

@Document(collection = "Foodtruck")
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public class Foodtruck {
  @Id
  private String id;

  @URL
  private String siteUrl;

  @NotBlank
  private String name;

  private String description;

  @NotBlank
  private String address;

  private List<Menu> menus;

  @JsonIgnore
  private GridFS image;
}

这里是菜单的模型:

@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
@Document(collection = "menus")
public class Menu {

  @Id
  private String id;

  private List<DayOfWeek> days;

  @NotBlank
  private String label;

  private String description;

  private Double price;

  private List<Dish> dishes;

  @JsonIgnore
  private GridFS image;
}

要获得我所有的菜单,我首先需要获得我所有的餐馆,然后合并所有获得的Flux,并通过我的休息控制器返回它们。

这是我从我的休息资源中调用的服务:

@Service
public class MenuServiceImpl implements MenuService {

  FoodtruckService foodtruckService;

  public MenuServiceImpl(FoodtruckService foodtruckService) {
    this.foodtruckService = foodtruckService;
  }

  @Override
  public Flux<Menu> getAllMenus() {
    Flux<Menu> allMenuFlux = Flux.empty();
    Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();

    foodtruckFlux.toStream().forEach(foodtruck -> {
      Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
      allMenuFlux.mergeWith(currentMenuFlux);
    });

    return allMenuFlux;
  }
}

mergeWith 似乎没有向 allMenuFlux 添加任何内容。我想我在这里有一个理解问题。

我已经阅读了文档,测试了 concat 或 zip 等其他方法,但它并没有像我希望的那样交错通量事件。我无法正确合并这些 Flux。 我还认为有更好的方法可以通过 menu 存储库获取 mongo 嵌入式文档,因为我的方法似乎有点矫枉过正,从长远来看可能会导致性能问题。我试过了,还是不行。

编辑: 尝试以下代码后(确保我的列表不为空),生成的 fluxtest3 变量被正确合并:

Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();

List<Foodtruck> test = foodtruckFlux.collectList().block();

Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);

但这并不是我想要的。为什么它不能使用空通量作为父通量。

我在这里缺少什么?

提前感谢您的帮助。

【问题讨论】:

  • 您是否尝试使用,映射而不是 toStream/foreach? foodtruckService.getAllFoodtrucks().flatMap(foodTrack -> Flux.fromIterable(foodtruck.getMenus()));这会返回一个通量,您不需要任何转换
  • 它工作正常,谢谢!。据我了解,flatmap 为我所有的食品卡车提取 Flux 并将其作为统一的 Flux 返回。所以它有点合并一切。是这样吗?也许你可以制定一个完整的答案,我会接受。

标签: java spring mongodb spring-boot project-reactor


【解决方案1】:

我认为这里存在一些误解。

  1. 如果您曾经将Flux 转换为CollectionStream 或类似的,然后从其中获得的东西又转换回Flux,那么您肯定做错了什么。这些类强制您的管道收集多个元素,然后处理它们,然后将它们转换回Flux。在几乎所有情况下,这都应该可以通过Flux 提供的操作来实现。

  2. Flux 上的方法不会更改 Flux,而是创建具有其他行为的新实例。所以如果你有这样的代码:

    Flux<Menu> allMenuFlux = Flux.empty();
    Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
    
    foodtruckFlux.toStream().forEach(foodtruck -> {
        Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
        allMenuFlux.mergeWith(currentMenuFlux);
    });
    

    返回所有MenuFlux;

    每次调用 allMenuFlux.mergeWith(currentMenuFlux); 时,您都会创建一个新的 Flux,以便垃圾收集器可以处理它。 allMenuFlux 仍然是你开始使用的空的 `Flux。

  3. 你真正想要的是:

    return foodtruckService.getAllFoodtrucks()
        .flatMap(Foodtruck::getMenus);
    

    请参阅flatMap 的文档。 flatMapmergeWith 的区别在于mergeWith 保留了原始元素。如果没有像您的用例那样,那是多余的。

奖励:您的附加问题

Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);

在这里,您返回的不是原始的fluxtest1,而是新生成的fluxtest2。因此它确实有效。

【讨论】:

  • 哇,感谢您的详细回答。它现在可以工作了,我知道为什么之前没有工作。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-10-03
  • 2012-05-04
  • 2020-11-25
  • 2019-11-24
  • 1970-01-01
  • 2021-07-31
  • 2022-01-22
相关资源
最近更新 更多