【发布时间】:2021-06-11 16:29:43
【问题描述】:
嗨,我正在阅读一个 kafka 主题,我想处理从 kafka 接收到的数据,例如标记化、过滤掉不必要的数据、删除停用词,最后我想写回另一个 Kafka 主题
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))
// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result
// write back to kafka
val writeStream = cleanedDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "writing.val")
.start()
writeStream.awaitTermination()
然后我收到以下错误
线程“主”org.apache.spark.sql.AnalysisException 中的异常: 必须使用流式源执行查询 writeStream.start();;
然后我编辑我的代码如下从kafka读取并写入控制台
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination();
// then perform the data processing part as mentioned in the first half
使用第二种方法,控制台中不断显示数据,但从未运行过数据处理部分。我能否知道如何从 kafka 主题中读取数据,然后对接收到的数据执行一些操作(标记化、删除停用词)并最终写回新的 kafka 主题?
编辑
堆栈跟踪在错误期间指向上述代码中的 df.show(false)
【问题讨论】:
标签: scala apache-spark apache-kafka spark-structured-streaming