如果你想要这样的产品,你不需要 Graph DSL。
def animalsAndYears(animals: Source[Animal, NotUsed], years: Source[Year, NotUsed]): Source[(Animal, Year), NotUsed] =
years.flatMapConcat { year =>
animals.map { animal =>
animal -> year
}
}
所以:
animalsAndYears(Source(listOfAnimals), Source(listOfYears))
会给你一个animal,year元组的流。假设你有一个函数:
def queryDBForAnimalYear(aandy: (Animal, Year)): Future[Seq[Row]] = ???
然后您可以通过以下方式获取行流:
val parallelism: Int = ??? // How many queries to have in-flight at a time
animalsAndYears(Source(listOfAnimals), Source(listOfYears))
.mapAsync(parallelism) { params => queryDBForAnimalYear(params) }
.mapConcat(identity) // gives you a Source[Row]