【问题标题】:Split dataset based on column value根据列值拆分数据集
【发布时间】:2018-05-26 01:27:19
【问题描述】:

我有一个Dataset<Row>,它是 Kafka readStream 的结果,如下面的 Java 代码 sn-p 所示。

m_oKafkaEvents = getSparkSession().readStream().format("kafka")  
  .option("kafka.bootstrap.servers", strKafkaAddress)  
  .option("subscribe", getInsightEvent().getTopic())  
  .option("maxOffsetsPerTrigger", "100000")  
  .option("startingOffsets", "latest")  
  .option("failOnDataLoss", false)  
  .load()  
  .select(functions.from_json(functions.col("value").cast("string"), oSchema).as("events"))  
  .select("events.*");  

m_oKafkaEvents  
{  
    {"EventTime":"1527005246864000000","InstanceID":"231","Model":"Opportunity_1","Milestone":"OrderProcessed"},  
    {"EventTime":"1527005246864000002","InstanceID":"232","Model":"Opportunity_2","Milestone":"OrderProcessed"},  
    {"EventTime":"1527005246864000001","InstanceID":"233","Model":"Opportunity_1","Milestone":"OrderProcessed"},  
    {"EventTime":"1527005246864000002","InstanceID":"234","Model":"Opportunity_2","Milestone":"OrderProcessed"}  
}  

我需要根据“模型”列拆分此数据集,这将产生两个数据集,如下所示;

 m_oKafkaEvents_for_Opportunity_1_topic 
   {  
       {"EventTime":"1527005246864000000","InstanceID":"231","Model":"Opportunity_1","Milestone":"OrderProcessed"},  
       {"EventTime":"1527005246864000001","InstanceID":"233","Model":"Opportunity_1","Milestone":"OrderProcessed"}   
   }  

   m_oKafkaEvents_for_Opportunity_2_topic  
   {  
      {"EventTime":"1527005246864000002","InstanceID":"232","Model":"Opportunity_2","Milestone":"OrderProcessed"},  
      {"EventTime":"1527005246864000002","InstanceID":"234","Model":"Opportunity_2","Milestone":"OrderProcessed"}  
   }  

这些数据集将发布到 Kafka 接收器中。主题名称将是模型值。即Opportunity_1Opportunity_2
因此,我需要有一个句柄列“模型”值和相应的事件列表。
由于是 spark 新手,我正在寻求有关如何通过 java 代码实现这一点的帮助。
感谢任何帮助。

【问题讨论】:

  • 更正拼写,改进格式

标签: apache-spark apache-kafka


【解决方案1】:

最简单的解决方案如下:

allEvents.selectExpr("topic", "CONCAT('m_oKafkaEvents_for_', Model, '_topic')")
        .write()
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .save();

您可以在此处查看示例 https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka 。但是在查看 Spark 的代码后,似乎我们只能有 1 个主题/写入,即它会选择第一个遇到的行作为主题:

def write(
  sparkSession: SparkSession,
  queryExecution: QueryExecution,
  kafkaParameters: ju.Map[String, Object],
  topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
validateQuery(schema, kafkaParameters, topic)
queryExecution.toRdd.foreachPartition { iter =>
  val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
  Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
    finallyBlock = writeTask.close())
}

您可以尝试这种方法,并在这里告诉它是否像上面所说的那样工作?如果它不起作用,您有替代解决方案,如:

  1. 缓存主 DataFrame 并创建 2 个其他 DataFrame,按 Model 属性过滤
  2. 使用 foreachPartition 和 Kafka 编写器发送消息而不拆分主数据集

第一个解决方案很容易实现,您可以使用所有 Spark 工具来实现。另一方面,至少在理论上,拆分数据集应该比第二个提议稍慢。但是在选择一个或另一个选项之前尝试进行衡量,也许差异会非常小,使用清晰和社区认可的方法总是更好。

您可以在下面找到一些显示这两种情况的代码:

SparkSession spark = SparkSession
            .builder()
            .appName("JavaStructuredNetworkWordCount")
            .getOrCreate();
    Dataset<Row> allEvents = spark.readStream().format("kafka")
            .option("kafka.bootstrap.servers", "")
            .option("subscribe", "event")
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .select(functions.from_json(functions.col("value").cast("string"), null).as("events"))
            .select("events.*");


    // First solution
    Dataset<Row> opportunity1Events = allEvents.filter("Model = 'Opportunity_1'");
    opportunity1Events.write().format("kafka").option("kafka.bootstrap.servers", "")
            .option("topic", "m_oKafkaEvents_for_Opportunity_1_topic").save();
    Dataset<Row> opportunity2Events = allEvents.filter("Model = 'Opportunity_2'");
    opportunity2Events.write().format("kafka").option("kafka.bootstrap.servers", "")
            .option("topic", "m_oKafkaEvents_for_Opportunity_2_topic").save();
    // Note: Kafka writer was added in 2.2.0 https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d18b03b8d24c

    // Another approach with iteration throughout messages accumulated within each partition
    allEvents.foreachPartition(new ForeachPartitionFunction<Row>() {
        private KafkaProducer<String, Row> localProducer = new KafkaProducer<>(new HashMap<>());

        private final Map<String, String> modelsToTopics = new HashMap<>();
        {
            modelsToTopics.put("Opportunity_1", "m_oKafkaEvents_for_Opportunity_1_topic");
            modelsToTopics.put("Opportunity_2", "m_oKafkaEvents_for_Opportunity_2_topic");
        }

        @Override
        public void call(Iterator<Row> rows) throws Exception {
            // If your message is Opportunity1 => add to messagesOpportunity1
            // otherwise it goes to Opportunity2
            while (rows.hasNext()) {
                Row currentRow = rows.next();
                // you can reformat your row here or directly in Spark's map transformation
                localProducer.send(new ProducerRecord<>(modelsToTopics.get(currentRow.getAs("Model")),
                        "some_message_key", currentRow));
            }
            // KafkaProducer accumulates messages in a in-memory buffer and sends when a threshold was reached
            // Flush them synchronously here to be sure that every stored message was correctly
            // delivered
            // You can also play with features added in Kafka 0.11: the idempotent producer and the transactional producer
            localProducer.flush();
        }
    });

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-27
    • 1970-01-01
    • 2019-05-06
    • 2021-03-28
    • 2021-01-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多