【发布时间】:2019-02-21 08:58:24
【问题描述】:
我正在研究 spark 流上下文,它在 avro 序列化中从 kafka 主题获取数据,如下所示。
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"schema.registry.url" -> "http://localhost:8081",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "1"
)
使用 Kafka utils 我正在创建 Direct 流,如下所示
val topics = Set("mysql-foobar")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](
topics,
kafkaParams)
)
我也将数据写入控制台
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
现在我想从这些 RDD 中创建数据框。是否有可能我已经从 stackoverflow 审查和测试了许多解决方案,但遇到了一些问题。 Stackoverflow 解决方案还有this 和this。 我的输出如下所示
{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
【问题讨论】:
-
查看结构化流式处理文档。不要使用 Dstream。 github.com/AbsaOSS/ABRiS
-
导入此 za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies 的 sbt 库依赖项是什么?
-
我曾尝试使用结构化流并按照建议的步骤进行此操作,但我遇到了错误,请您检查一下我做错了什么。
-
我认为你复制了错误的部分github.com/AbsaOSS/ABRiS/blob/master/…
标签: scala apache-spark apache-kafka apache-spark-sql spark-streaming