【问题标题】:One to many merge using Akka streams使用 Akka 流进行一对多合并
【发布时间】:2020-12-06 11:26:42
【问题描述】:

我有一个用例,我有一个要从数据库中获取的值列表和一个需要获取值的日期列表。我想使用 akka 流(Flow 或 Source with GraphDSL)在它们之间建立一对多(或多对一)关系,以便获取每个日期的每个值

例如,

动物 = 牛、山羊、绵羊

年=2018、2019

预期的流输出是

牛与 2018

山羊 & 2018

羊与 2018

牛与 2019

山羊和 2019

羊与 2019

【问题讨论】:

    标签: akka akka-stream


    【解决方案1】:

    如果你想要这样的产品,你不需要 Graph DSL。

    def animalsAndYears(animals: Source[Animal, NotUsed], years: Source[Year, NotUsed]): Source[(Animal, Year), NotUsed] =
      years.flatMapConcat { year =>
        animals.map { animal =>
          animal -> year
        }
      }
    

    所以:

     animalsAndYears(Source(listOfAnimals), Source(listOfYears))
    

    会给你一个animalyear元组的流。假设你有一个函数:

     def queryDBForAnimalYear(aandy: (Animal, Year)): Future[Seq[Row]] = ???
    

    然后您可以通过以下方式获取行流:

    val parallelism: Int = ??? // How many queries to have in-flight at a time
    animalsAndYears(Source(listOfAnimals), Source(listOfYears))
      .mapAsync(parallelism) { params => queryDBForAnimalYear(params) }
      .mapConcat(identity)  // gives you a Source[Row]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-11-16
      • 2014-02-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-08
      • 2018-12-11
      相关资源
      最近更新 更多