【发布时间】:2021-02-24 05:54:21
【问题描述】:
我有一个带有 akka、akka http 和 akka 流的应用程序,所以它不是休息或微服务是一个短暂的应用程序;它是来自 rest api 的数据提取器,应用程序写入 postgres 数据库,它的工作原理只是我有一个疑问,如果以这种方式集成衰落和 akka 是否正确。
代码:
object App extends CommandIOApp(name = "app" , header = "extractor", version = "0.1.0") {
val config = Config( token, jdbcDriver = jdbcDriver, jdbcURL = jdbcURL, jdbcUser = jdbcUser, jdbcPassword = jdbcPassword, jdbcSchema = jdbcSchema)
override def main: Opts[IO[ExitCode]] =
Opts.subcommand(name="extract-teams", help="Extract Teams from API"){
val tableName = Opts.option[String]("recursive", short="r", help="Recursive Extraction")
val outputFile = Opts.option[Path]("output", short="o", help="Output file").withDefault(Paths.get("output.csv"))
( tableName, outputFile).mapN{( table, output) =>
println(table)
println(output)
ClickUpExtractions.extractTeams()
IO(ExitCode.Success)
}
}
object ClickUpExtractions extends ActorGlobalImplicits {
def extractTeams(): Unit ={
import com.arkondata.extractors.ClickUpTeamsActions._
val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config), name = "EngineActor")
val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config) , name = "Writer")
extractorFetcher ! Fetch(extractorWriter)
}
}
}
接收代码:
override def receive: Receive = {
case WritePG(teamsData) =>
println("-------writer")
println(teamsData)
val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
println(teamsPG)
println("insert-----------------------")
//// manejar exception aqui importante!!!
val upsertStatement = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafeToFuture()//.unsafeRunSync()
//// manejar exception aqui importante!!!
println("insert doobie")
println("end------------------------")
self ! "mensaje salida myself!!!!"
case msg:String =>
println(msg)
context.system.terminate()
System.exit(1) // this is the last step in the akka flow
}
注意:
System.exit(1) // this is the last step in the akka flow
我也使用其他库,如 doobie、dure、cats:https://ben.kirw.in/decline/effect.html。
应用程序可以正常工作,我只需要知道它是否正确,或者是否存在 akka 和 this 之间的更好集成:IO(ExitCode.Success)
有什么方法可以从 akka 获取一个告诉响应并验证它,例如:
val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res))
IO(ExitCode.Success)
else
IO(ExitCode.Error)
【问题讨论】:
标签: scala akka akka-stream akka-http scala-cats