【发布时间】:2020-12-25 12:59:45
【问题描述】:
我正在编写一个结构化的流式 Spark 应用程序,我正在从 Kafka 队列中读取数据并处理收到的消息。我想要的最终结果是一个DataSet[MyMessage](其中MyMessage 是一个自定义对象),我想将它排入另一个Kafka 主题。问题是,来自消费者 Kafka 队列的每条输入消息都可以产生多个 MyMessage 对象,因此转换不是 1:1、1:Many。
所以我在做
val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.option("failOnDataLoss", false)
.option("startingOffsets", "offset1")
.load()
.select($"value")
.mapPartitions{r => createMessages(r)}
val createMessages(row: Iterator[Row]): List[MyMessage] = {
// ...
}
显然,messagesDataSet 是 DataSet[List[MyMessage]]。有没有办法让我只得到一个DataSet[MyMessage]?
或者有没有办法获取DataSet[List[MyMessage]],然后将每个MyMessage 对象写入另一个Kafka 主题? (毕竟这是我的最终目标)
【问题讨论】:
标签: scala apache-spark apache-spark-sql apache-spark-dataset