【问题标题】:Parsing stops with Akka Streams mapAsync使用 Akka Streams mapAsync 停止解析
【发布时间】:2018-02-13 14:20:14
【问题描述】:

我正在解析 50000 条记录,其中包含网页上的标题和 URL。在解析时,我将它们写入数据库,即 PostgreSQL。我使用 docker-compose 部署了我的应用程序。但是,它会无缘无故地停在某个页面上。我试图写一些日志来弄清楚发生了什么,但是没有连接错误或类似的东西。

这是我解析和写入数据库的代码:

object App {
  val db = Database.forURL("jdbc:postgresql://db:5432/toloka?user=user&password=password")
  val browser = JsoupBrowser()
  val catRepo = new CategoryRepo(db)
  val torrentRepo = new TorrentRepo(db)
  val torrentForParseRepo = new TorrentForParseRepo(db)
  val parallelismFactor = 10
  val groupFactor = 10
  implicit val system = ActorSystem("TolokaParser")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

def parseAndWriteTorrentsForParseToDb(doc: App.browser.DocumentType) = {
    Source(getRecordsLists(doc))
      .grouped(groupFactor)
      .mapAsync(parallelismFactor) { torrentForParse: Seq[TorrentForParse] =>
        torrentForParseRepo.createInBatch(torrentForParse)
      }
      .runWith(Sink.ignore)
  }

 def getRecordsLists(doc: App.browser.DocumentType) = {
    val pages = generatePagesFromHomePage(doc)
    println("torrent links generated")
    println(pages.size)
    val result = for {
      page <- pages
    } yield {
      println(s"Parsing torrent list...$page")
      val tmp = getTitlesAndLinksTuple(getTitlesList(browser.get(page)), getLinksList(browser.get(page)))
      println(tmp.size)
      tmp
    }
    println("torrent links and names tupled")
    result flatten
  }

}

这些问题的原因可能是什么?

【问题讨论】:

  • .runWith(Sink.ignore),你得到了Future。有异常你处理吗?

标签: postgresql scala asynchronous akka-stream


【解决方案1】:

制定监督策略,以避免出现错误时流终结。如:

val decider: Supervision.Decider = {
  case _ => Supervision.Resume
}

def parseAndWriteTorrentsForParseToDb = {
  Source.fromIterator(() => List(1,2,3).toIterator)
    .grouped(1)
    .mapAsync(1) { torrentForParse: Seq[Int] =>
      Future { 0 }
    }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))
    .runWith(Sink.ignore)
}

流不应在此异步阶段配置中停止

【讨论】:

    猜你喜欢
    • 2016-06-21
    • 2023-04-03
    • 2017-04-05
    • 2018-04-28
    • 2016-11-14
    • 1970-01-01
    • 2018-02-06
    • 2019-04-13
    • 1970-01-01
    相关资源
    最近更新 更多