【问题标题】:Akka Stream Materialized ValueAkka 流物化值
【发布时间】:2020-08-15 21:24:43
【问题描述】:

我想引用流程中的具体化值。下面是sn-p的代码,但是编译不出来,报错:

type mismatch;
 found   : (akka.NotUsed, scala.concurrent.Future[akka.Done])
 required: (Playground.DomainObj, scala.concurrent.Future[akka.Done])

代码:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.Future
import akka.NotUsed
import akka.Done

implicit val actorSystem = ActorSystem("example")

case class DomainObj(name: String, age: Int)

 val customFlow1:Flow[String,DomainObj,NotUsed] = Flow[String].map(s => {
    DomainObj(s, 50)
  })

  val customFlow2 = Flow[DomainObj].map(s => {
    s.age + 10
  })

val printAnySink: Sink[Any, Future[Done]] = Sink.foreach(println)

val c1 = Source.single("John").viaMat(customFlow1)(Keep.right).viaMat(customFlow2)(Keep.left).toMat(printAnySink)(Keep.both)

val res: (DomainObj, Future[Done]) = c1.run()

在 Playground 中查找代码:https://scastie.scala-lang.org/P9iSx49cQcaOZfKtVCzTPA

我想在流完成后引用 DomainObj/

【问题讨论】:

    标签: akka akka-stream materialized


    【解决方案1】:

    Flow[String, DomainObj, NotUsed] 的具体化值为NotUsed,而不是DomainObj,因此c1 的具体化值为(NotUsed, Future[Done])

    看起来这里的意图是捕获在customFlow1 中创建的DomainObj。这可以通过

    来完成
    val customFlow1: Flow[String, DomainObj, Future[DomainObj]] =
      Flow[String]
        .map { s => DomainObj(s, 50) }
        .alsoTo(Sink.head)
    
    val res: (Future[DomainObj], Future[Done]) = c1.run()
    

    请注意,Sink.head 实际上要求 customFlow1 只能用于仅发出一次的内容的下游。

    【讨论】:

    • 您好 Levi,我尝试使用 String 的简单流程 - val customFlow1:Flow[String, String, Future[String]] = Flow[String].map(abc => abc.toString)。将编译问题作为 Flow[String, String, NotUsed]#Repr[String] 类型的表达式不符合预期的 Flow[String, String, Future[String]] 类型。是否应该使用不同的形式创建流
    • 是的,这显然行不通,因为Flow[String] 创建了一个Flow[String, String, NotUsed]:即它的物化值为NotUsed,这(按设计)是无用的。您在我的回答中尝试过alsoTo 方法吗?
    猜你喜欢
    • 2016-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-18
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 2016-12-18
    相关资源
    最近更新 更多