【问题标题】:Spark Structured Streaming foreach Sink custom writer is not able to read data from Kafka topicSpark Structured Streaming foreach Sink 自定义编写器无法从 Kafka 主题读取数据
【发布时间】:2020-07-12 18:25:14
【问题描述】:

我有 spark 结构化流式作业来从 kafka 主题中读取它。但是,在订阅主题时,该作业不会将数据写入控制台或使用 foreach 编写器将其转储到数据库。

我有类 DBWriter extends ForeachWriter<Row> 仍然永远不会调用此类的 open, process, close 方法。

如果您需要更多信息,请告诉我。

已按照Spark Kafka integration guide 的说明进行操作。仍然无法正常工作。

Spark 版本 2.3.1 卡夫卡 0.10.0

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.3.1</version>
</dependency>

我的代码是:

spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")   
  .option("subscribe", "TOPIC1")    
  .option("startingOffsets", "latest") // read data from the end of the stream
  .load()

还有

Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
  .cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
  .cast("string"), schema).alias("data"));

selectDf.writeStream()
  .trigger(Trigger.ProcessingTime(1000))
  .foreach(new DBWriterSink())
  .option("checkpointLocation","/tmp/chp_path/")

输入数据格式如下:

数据为json格式:



    {"input_source_data": 
    { "key1":"value1", 
    "key2": "value2"
     } 
    }

【问题讨论】:

  • 你能告诉我们数据是什么样子的吗?
  • @mike 我已经更新了问题,数据的外观如何,它是 json 格式。
  • @mike,我能够解决之前的错误,但是我的 DBWriter Sink 没有被调用,我看不到 kafka 接收到的任何数据。有一个将数据推送到 kafka topci 的测试 kafka 生产者。任何指针都会有所帮助。

标签: java apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

实际问题是由于 kafka 配置设置不正确。 主题订阅不成功,握手失败。正确更正kafka属性后。 能够读取数据,它正在额外设置这些属性。删除后,它开始工作。 能够阅读消息并看到 ForEachWriter 也被调用。

properties.put("security.protocol", "SSL");

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-05
    • 1970-01-01
    • 2021-01-05
    • 2020-09-13
    • 2019-06-24
    • 2018-01-09
    • 1970-01-01
    • 2018-09-06
    相关资源
    最近更新 更多