【问题标题】:SparkStreaming: DirectStream RDD to dataframe [duplicate]Spark Streaming:DirectStream RDD到数据帧[重复]
【发布时间】:2019-02-21 08:58:24
【问题描述】:

我正在研究 spark 流上下文,它在 avro 序列化中从 kafka 主题获取数据,如下所示。

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "group.id" -> "1"
)

使用 Kafka utils 我正在创建 Direct 流,如下所示

val topics = Set("mysql-foobar")


val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String,String](
    topics,
    kafkaParams)
)

我也将数据写入控制台

stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

现在我想从这些 RDD 中创建数据框。是否有可能我已经从 stackoverflow 审查和测试了许多解决方案,但遇到了一些问题。 Stackoverflow 解决方案还有thisthis。 我的输出如下所示

{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}

【问题讨论】:

标签: scala apache-spark apache-kafka apache-spark-sql spark-streaming


【解决方案1】:

由于您使用的是 Confluent 序列化程序,并且它们目前不提供与 Spark 的简单集成,因此您可以在 Github 上查看 AbsaOSS 提供的一个相对较新的库,以帮助解决此问题。

但基本上,你使用 Spark Structured Streaming 来获取 DataFrames,不要尝试使用 Dstream to RDD to Dataframe...

你可以找到examples of what you're looking for here

另请参阅Integrating Spark Structured Streaming with the Kafka Schema Registry 上的其他示例

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-31
    • 2019-01-16
    相关资源
    最近更新 更多