【问题标题】:how to integrate akka actors with decline如何将 akka 演员与衰落结合起来
【发布时间】:2021-02-24 05:54:21
【问题描述】:

我有一个带有 akka、akka http 和 akka 流的应用程序,所以它不是休息或微服务是一个短暂的应用程序;它是来自 rest api 的数据提取器,应用程序写入 postgres 数据库,它的工作原理只是我有一个疑问,如果以这种方式集成衰落和 akka 是否正确。

代码:

object App extends CommandIOApp(name = "app" , header = "extractor", version = "0.1.0") {

 
  val config = Config( token, jdbcDriver = jdbcDriver, jdbcURL = jdbcURL, jdbcUser = jdbcUser, jdbcPassword = jdbcPassword, jdbcSchema = jdbcSchema)

  override def main: Opts[IO[ExitCode]] =
    Opts.subcommand(name="extract-teams", help="Extract Teams from API"){

      val tableName = Opts.option[String]("recursive", short="r", help="Recursive Extraction")
      val outputFile = Opts.option[Path]("output", short="o", help="Output file").withDefault(Paths.get("output.csv"))

      ( tableName, outputFile).mapN{( table, output) =>
        println(table)
        println(output)

        ClickUpExtractions.extractTeams()

        IO(ExitCode.Success)
      }
    }

  object ClickUpExtractions extends ActorGlobalImplicits {

    def extractTeams(): Unit ={

      import com.arkondata.extractors.ClickUpTeamsActions._
      val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config), name = "EngineActor")
      val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config) , name = "Writer")

      extractorFetcher ! Fetch(extractorWriter)

    }

  }

}

接收代码:

override def receive: Receive = {
    case WritePG(teamsData) =>
      println("-------writer")
      println(teamsData)
      val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
      println(teamsPG)

      println("insert-----------------------")
      //// manejar exception aqui  importante!!!
      val upsertStatement = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
      BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafeToFuture()//.unsafeRunSync()
      //// manejar exception aqui  importante!!!
      println("insert doobie")
      println("end------------------------")

      self ! "mensaje salida myself!!!!"
    case msg:String =>
      println(msg)
      context.system.terminate()
      System.exit(1) // this is the last step in the akka flow
  }

注意:

System.exit(1) // this is the last step in the akka flow

我也使用其他库,如 doobie、dure、cats:https://ben.kirw.in/decline/effect.html。 应用程序可以正常工作,我只需要知道它是否正确,或者是否存在 akka 和 this 之间的更好集成:IO(ExitCode.Success)

有什么方法可以从 akka 获取一个告诉响应并验证它,例如:

val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res)) 
   IO(ExitCode.Success)
else 

   IO(ExitCode.Error)

【问题讨论】:

    标签: scala akka akka-stream akka-http scala-cats


    【解决方案1】:

    Akka 生态系统依赖于Future。 Cats Effect 依赖于一些 F[_] - 内置是 IO 但也有 Monix 的 Task 和 ZIO。

    Translation Future IO 可以这样完成:

    io.unsafeToFuture
    
    IO.fromFuture(IO(start future here)) // Future start is side-effect on its own
    

    Monix 也是如此:

    task.runToFuture // requires Scheduler
    
    Task.deferFuture(future thunk) // or
    Task.deferFutureAction(implicit scheduler => future)
    

    如果您使用的是无标签,那么它是:

    Async.fromFuture(Sync[F].delay(start future)) // requires implicit Async[F]
    
    val runToFuture: F ~> Future // AFAIR there is no popular type class for it
    runToFuture(io)
    

    如果您使用的是流,那么有 streamz 库来处理 Akka Streams FS2 翻译。 Akka Streams、Monix 的 Observable 实现和 FS2 提供了 Reactive Streams 的实现,因此您可以使用 RS 接口使这些库相互通信,因此像 streamz 这样的库只是一种便利工具。

    AFAIK 没有其他集成,也不需要它们。

    但是,我建议您了解 Akka 和 Future 如何工作(渴望、记忆、执行上下文等)与 Cats Effect 如何工作(通常是懒惰、没有记忆、除了初始化 ClockContextShift 之外没有 EC在 CE IO 中,CE IO 和 Monix 之间的细微差别,例如调度程序)。如果你不知道它们之间的区别,你很容易造成一些伤害。

    【讨论】:

      【解决方案2】:

      一些建议:

      1. 您可以将 Akka 与其他基于猫的框架一起使用,但您的应用程序中会有两种不同的术语:Fibers、ActorSystems、Futures 与 IO 效果……所以,如果您想使用 Cats,请选择 Monix 或 FS2 ,它们直接与 Cats 效果相关联。相反,如果您必须使用 Akka,请使用 Akka Streams,它有一个很棒的套件,并且只处理 Akka 调度程序和 Scala Futures。

      2. 使用 Akka Typed。自从 Akka Typed 投入生产以来,Akka 已经有了很大的改进。您正在接收函数中创建异步计算,这很容易出错。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-05-19
        • 2014-07-11
        • 1970-01-01
        • 2011-11-17
        • 1970-01-01
        • 2018-05-15
        相关资源
        最近更新 更多