【问题标题】:Spark Dataframe to KafkaSpark Dataframe 到 Kafka
【发布时间】:2018-06-19 04:49:46
【问题描述】:

我正在尝试将 Spark Dataframe 流式传输到 Kafka 消费者。我做不到,请您给我建议。

我能够从 Kafka 生产者那里挑选数据到 Spark,并且我已经执行了一些操作,在处理数据之后,我有兴趣将其流式传输回 Kafka(消费者)。

【问题讨论】:

  • “不能做”,你到底遇到了什么问题?如果可能,应将最低可行代码作为问题的一部分。
  • val ds = df1.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "kafka_topic_13")。 start() DF1 是 Spark 数据帧,我希望将此数据帧流式传输到 kafka。错误提示数据帧 (df1) 不是流数据帧/数据集。
  • 您需要遍历数据帧的每个分区。对于每个分区,然后循环遍历所有元素并通过 Kafka Producer 发送它们。我没有代码,但我敢肯定你不是第一个问这个的人
  • 是的,我在循环内写,因为我没有足够的空间在这里我无法显示整个代码:(,我正在尝试其他方式,正在处理它..
  • @GuruprasadSwaminathan 您可以随时编辑问题并提供所需的所有信息。

标签: apache-spark apache-kafka


【解决方案1】:

这是一个流式生产到 kafka 的示例,但批处理版本几乎相同

从源流式传输到 kafka:

 val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("topic", "topic1")
      .start()

将静态数据帧(不是从源流式传输)写入 kafka

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

请记住

  1. 每一行都是一条消息。
  2. 数据帧必须是流数据帧。如果您有静态数据框,请使用静态版本。

查看基本文档:https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

听起来您有一个静态数据帧,它不是从源流式传输的。

【讨论】:

  • 非常感谢:) 从 2.1.1 升级到 2.3.0 后,它现在可以工作了。
  • 我升级了系统我还更新了我的 build.sbt 以指出 2.3.0 。
猜你喜欢
  • 2019-05-16
  • 1970-01-01
  • 2016-05-12
  • 2021-06-04
  • 1970-01-01
  • 2017-10-18
  • 2018-06-05
  • 2015-09-13
  • 2017-02-03
相关资源
最近更新 更多