【发布时间】:2020-07-12 18:25:14
【问题描述】:
我有 spark 结构化流式作业来从 kafka 主题中读取它。但是,在订阅主题时,该作业不会将数据写入控制台或使用 foreach 编写器将其转储到数据库。
我有类 DBWriter extends ForeachWriter<Row> 仍然永远不会调用此类的 open, process, close 方法。
如果您需要更多信息,请告诉我。
已按照Spark Kafka integration guide 的说明进行操作。仍然无法正常工作。
Spark 版本 2.3.1 卡夫卡 0.10.0
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
我的代码是:
spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "TOPIC1")
.option("startingOffsets", "latest") // read data from the end of the stream
.load()
还有
Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
.cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
.cast("string"), schema).alias("data"));
selectDf.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.foreach(new DBWriterSink())
.option("checkpointLocation","/tmp/chp_path/")
输入数据格式如下:
数据为json格式:
{"input_source_data":
{ "key1":"value1",
"key2": "value2"
}
}
【问题讨论】:
-
你能告诉我们数据是什么样子的吗?
-
@mike 我已经更新了问题,数据的外观如何,它是 json 格式。
-
@mike,我能够解决之前的错误,但是我的 DBWriter Sink 没有被调用,我看不到 kafka 接收到的任何数据。有一个将数据推送到 kafka topci 的测试 kafka 生产者。任何指针都会有所帮助。
标签: java apache-spark apache-kafka spark-structured-streaming