【发布时间】:2018-03-30 22:00:24
【问题描述】:
我已经尝试过实现这样的结构化流......
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
这一切似乎都有效,但看看 MyWrapper.myWriter 的吞吐量是可怕的。它实际上是在尝试成为一个 JDBC 接收器,它看起来像:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach( s => statement.execute(s) )
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
所以我的问题是 - 是否为每一行实例化了新的 ForeachWriter?因此为数据集中的每一行调用 open() 和 close() ?
有没有更好的设计来提高吞吐量?
如何一次解析SQL语句多次执行,同时保持数据库连接打开?
【问题讨论】:
标签: database scala apache-spark jdbc spark-structured-streaming