【问题标题】:How to create a Spark DataSet when the transformation is not 1:1, but 1:many当转换不是 1:1 而是 1:many 时如何创建 Spark DataSet
【发布时间】:2020-12-25 12:59:45
【问题描述】:

我正在编写一个结构化的流式 Spark 应用程序,我正在从 Kafka 队列中读取数据并处理收到的消息。我想要的最终结果是一个DataSet[MyMessage](其中MyMessage 是一个自定义对象),我想将它排入另一个Kafka 主题。问题是,来自消费者 Kafka 队列的每条输入消息都可以产生多个 MyMessage 对象,因此转换不是 1:1、1:Many。

所以我在做

val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("subscribe", "topic1")
      .option("failOnDataLoss", false)
      .option("startingOffsets", "offset1")
      .load()
      .select($"value")
      .mapPartitions{r => createMessages(r)}

val createMessages(row: Iterator[Row]): List[MyMessage] = {
   // ...
}

显然,messagesDataSetDataSet[List[MyMessage]]。有没有办法让我只得到一个DataSet[MyMessage]

或者有没有办法获取DataSet[List[MyMessage]],然后将每个MyMessage 对象写入另一个Kafka 主题? (毕竟这是我的最终目标)

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    试试

    messagesDataSet.flatMap(identity)
    

    【讨论】:

      【解决方案2】:

      您可以使用 mapPartitions 创建多个值(因此它的工作原理类似于 flatMap),但您必须返回 Iterator:

        def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
          row.map(/*...*/) //you need too return iterator here
        }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-06-21
        • 2012-11-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-05-09
        • 2018-05-05
        相关资源
        最近更新 更多