【问题标题】:Spark Structured Streaming ForeachWriter and database performanceSpark 结构化流 ForeachWriter 和数据库性能
【发布时间】:2018-03-30 22:00:24
【问题描述】:

我已经尝试过实现这样的结构化流......

myDataSet
  .map(r =>  StatementWrapper.Transform(r))
  .writeStream
  .foreach(MyWrapper.myWriter)
  .start()
  .awaitTermination()

这一切似乎都有效,但看看 MyWrapper.myWriter 的吞吐量是可怕的。它实际上是在尝试成为一个 JDBC 接收器,它看起来像:

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {

  var connection: Connection = _

  override def open(partitionId: Long, version: Long): Boolean = {
    Try (connection = getRemoteConnection).isSuccess
  }

  override def process(row: Seq[String]) {
    val statement = connection.createStatement()
    try {
      row.foreach( s => statement.execute(s) )
    } catch {
      case e: SQLSyntaxErrorException => println(e)
      case e: SQLException => println(e)
    } finally {
      statement.closeOnCompletion()
    }
  }

  override def close(errorOrNull: Throwable) {
    connection.close()
  }
}

所以我的问题是 - 是否为每一行实例化了新的 ForeachWriter?因此为数据集中的每一行调用 open() 和 close() ?

有没有更好的设计来提高吞吐量?

如何一次解析SQL语句多次执行,同时保持数据库连接打开?

【问题讨论】:

    标签: database scala apache-spark jdbc spark-structured-streaming


    【解决方案1】:

    底层接收器的打开和关闭取决于ForeachWriter您的实现

    调用ForeachWriter 的相关类是ForeachSink,这是调用您的编写器的代码:

    data.queryExecution.toRdd.foreachPartition { iter =>
      if (writer.open(TaskContext.getPartitionId(), batchId)) {
        try {
          while (iter.hasNext) {
            writer.process(encoder.fromRow(iter.next()))
          }
        } catch {
          case e: Throwable =>
            writer.close(e)
            throw e
        }
        writer.close(null)
      } else {
        writer.close(null)
      }
    }
    

    对于从您的源生成的每个批次,都会尝试打开和关闭编写器。如果您希望 openclose 每次都真正打开和关闭接收器驱动程序,您需要通过您的实现来这样做。

    如果您想更好地控制数据的处理方式,您可以实现Sink trait,它给出了一个批处理 id 和底层的DataFrame

    trait Sink {
      def addBatch(batchId: Long, data: DataFrame): Unit
    }
    

    【讨论】:

    • 非常感谢。无论如何,您是否可以解释或指出一篇文章,该文章描述了在所有三种模式(追加、更新和完成)下实现接收器时要考虑的事项?
    • 我的问题真的是 outputModes 和 Sink 接口如何相互作用?
    • @user1870400 AFAIK,接收器根据您设置的输出模式获取数据的相关部分。例如,如果您只想要更新,那么 Sink 只会在每次作业迭代时更新元素。
    • 批次在这里是什么意思,更多关于批次的细节会很棒?看起来像 open,close 每个分区只调用一次。
    • 好的,这里的批次是流媒体中的微批次 ID。更多详情请点击此处docs.azuredatabricks.net/spark/latest/structured-streaming/…
    猜你喜欢
    • 2017-07-12
    • 2019-08-24
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    • 2018-10-06
    • 2018-08-18
    • 2018-08-20
    相关资源
    最近更新 更多