【发布时间】:2018-10-28 00:43:16
【问题描述】:
有人可以向我推荐一个在 S3 或任何文件系统中编写 avro 的好例子或示例吗?我正在使用自定义 Sink,但我想通过 SinkProvider 的构造函数传递一些属性 Map,我猜这些属性可以进一步传递给 Sink?
更新代码:
val query = df.mapPartitions { itr =>
itr.map { row =>
val rowInBytes = row.getAs[Array[Byte]]("value")
MyUtils.deserializeAvro[GenericRecord](rowInBytes).toString
}
}.writeStream
.format("com.test.MyStreamingSinkProvider")
.outputMode(OutputMode.Append())
.queryName("testQ" )
.trigger(ProcessingTime("10 seconds"))
.option("checkpointLocation", "my_checkpoint_dir")
.start()
query.awaitTermination()
接收器提供者:
class MyStreamingSinkProvider extends StreamSinkProvider {
override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
new MyStreamingSink
}
}
下沉:
class MyStreamingSink extends Sink with Serializable {
final val log: Logger = LoggerFactory.getLogger(classOf[MyStreamingSink])
override def addBatch(batchId: Long, data: DataFrame): Unit = {
//For saving as text doc
data.rdd.saveAsTextFile("path")
log.warn(s"Total records processed: ${data.count()}")
log.warn("Data saved.")
}
}
【问题讨论】:
-
我创建了我的自定义 SinkProvider 和 Sink。我以 writeStream 的“格式”传递了完全限定的类名,它现在对我有用。但是,我还有另一个问题。有没有办法将自定义属性传递给 SinkProvider? writeStream 中的“格式”只接受一个字符串作为参数。
-
查看下面的代码,请建议我是否可以将属性映射作为驱动程序类的构造函数传递。
class MyStreamingSinkProvider(prop: Map[String, String]) extends StreamSinkProvider { override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { new MyStreamingSink(prop: Map[String, String]) } } -
感谢您的快速回复。我想我已经从之前发布的初始错误中继续前进,并用当前问题 @cricket_007 完全编辑了我的帖子。知道如何将自定义属性传递给 SinkProvider 会很高兴。
-
有什么想法或建议吗??
标签: scala apache-kafka avro spark-structured-streaming