【发布时间】:2019-05-11 22:57:27
【问题描述】:
我正在写入一个内存分布式数据库,批量大小是用户在多线程环境中定义的。但我想限制写入 ex 的行数。 1000 行/秒。这个要求的原因是我的生产者写得太快了,而消费者遇到了叶子内存错误。在批处理记录时是否有任何标准做法来执行节流。
dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}
// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}
我希望如果我可以将用户定义的参数作为throttleMax 传递,并且如果每个线程写入的总记录达到throttleMax,thread.sleep() 将被调用1 秒。但这会使整个过程变得非常缓慢。有没有其他有效的方法可以将数据的加载速度限制在 1000 行/秒?
【问题讨论】:
-
阅读有关 reactiveX 的信息。它可以做你想做的事:github.com/ReactiveX/RxJava/wiki/Backpressure,你可以将它用于 scala github.com/ReactiveX/RxScala
-
我会使用Outwatch。首先,它是拉式的,而不是基于推式的。因此,您可以实现一种与执行中最慢的步骤完全相同的方法。并且限制就像
.delayOnNext(duration: FiniteDuration)一样简单。
标签: java multithreading scala throttling