【问题标题】:spark structured streaming avro to avro and custom Sinkspark 结构化流 avro 到 avro 和自定义接收器
【发布时间】:2018-10-28 00:43:16
【问题描述】:

有人可以向我推荐一个在 S3 或任何文件系统中编写 avro 的好例子或示例吗?我正在使用自定义 Sink,但我想通过 SinkProvider 的构造函数传递一些属性 Map,我猜这些属性可以进一步传递给 Sink?

更新代码:

val query = df.mapPartitions { itr =>
  itr.map { row =>
    val rowInBytes = row.getAs[Array[Byte]]("value")
    MyUtils.deserializeAvro[GenericRecord](rowInBytes).toString
  }
}.writeStream
  .format("com.test.MyStreamingSinkProvider")
  .outputMode(OutputMode.Append())
  .queryName("testQ" )
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation", "my_checkpoint_dir")
  .start()

query.awaitTermination()

接收器提供者:

class MyStreamingSinkProvider extends StreamSinkProvider {

  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    new MyStreamingSink
  }
}

下沉:

class MyStreamingSink extends Sink with Serializable {

  final val log: Logger = LoggerFactory.getLogger(classOf[MyStreamingSink])

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    //For saving as text doc
    data.rdd.saveAsTextFile("path")

    log.warn(s"Total records processed: ${data.count()}")
    log.warn("Data saved.")
  }
}

【问题讨论】:

  • 我创建了我的自定义 SinkProvider 和 Sink。我以 writeStream 的“格式”传递了完全限定的类名,它现在对我有用。但是,我还有另一个问题。有没有办法将自定义属性传递给 SinkProvider? writeStream 中的“格式”只接受一个字符串作为参数。
  • 查看下面的代码,请建议我是否可以将属性映射作为驱动程序类的构造函数传递。 class MyStreamingSinkProvider(prop: Map[String, String]) extends StreamSinkProvider { override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { new MyStreamingSink(prop: Map[String, String]) } }
  • 感谢您的快速回复。我想我已经从之前发布的初始错误中继续前进,并用当前问题 @cricket_007 完全编辑了我的帖子。知道如何将自定义属性传递给 SinkProvider 会很高兴。
  • 有什么想法或建议吗??

标签: scala apache-kafka avro spark-structured-streaming


【解决方案1】:

您应该能够通过writeStream.option(key, value) 将参数传递给您的自定义接收器:

DataStreamWriter writer = dataset.writeStream()
  .format("com.test.MyStreamingSinkProvider")
  .outputMode(OutputMode.Append())
  .queryName("testQ" )
  .trigger(ProcessingTime("10 seconds"))
  .option("key_1", "value_1")
  .option("key_2", "value_2")
  .start()

在这种情况下,parameters 方法中的 MyStreamingSinkProvider.createSink(...) 将包含 key_1key_2

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-20
    • 2019-11-28
    • 1970-01-01
    • 2017-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多