【问题标题】:Message getting lost in Kafka + Spark Streaming消息在 Kafka + Spark Streaming 中丢失
【发布时间】:2017-10-30 23:29:52
【问题描述】:

我在使用 Kafka 的 Spark Streaming 中遇到数据丢失的问题,我的用例如下:

  • Spark 流 (DirectStream) 应用程序从 Kafka 主题和处理它。
  • 根据处理后的消息,应用程序将编写 处理消息到不同的 Kafka 主题,例如如果消息 被协调,然后写到协调的主题,否则不协调 主题。

现在,问题在于,在流式传输期间,我以某种方式丢失了一些消息,即所有传入消息都没有写入协调或不协调的主题。 例如,如果应用程序在一批中收到 30 条消息,那么有时它会将所有消息写入输出主题(这是预期的行为),但有时它只写入 27 条(丢失 3 条消息,此数字可以更改)。

以下是我使用的版本:

  • Spark 1.6.0
  • 卡夫卡 0.9

Kafka主题配置如下:

  • 经纪人数量:3
  • num 复制因子:3
  • 分区数:3

以下是我用于 kafka 的属性:

val props = new Properties() 
props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) 
props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) 
props.put("group.id", properties.getProperty("group.id")) 
props.put("serializer.class", "kafka.serializer.StringEncoder") 
props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) 
props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) 
props.put("acks", "all"); 
props.put("retries", "5"); 
props.put("request.required.acks", "-1")

以下是我将处理过的消息写入 Kafka 的一段代码:

val schemaRdd2 = finalHarmonizedDF.toJSON 
schemaRdd2.foreachPartition { partition => 
    val producerConfig = new ProducerConfig(props) 
    val producer = new Producer[String, String](producerConfig) 

    partition.foreach { row => 
        if (debug) println(row.mkString) 
        val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), 
        null, row.toString()) 
        producer.send(keyedMessage) 
    } 
    //hack, should be done with the flush 
    Thread.sleep(1000) 
    producer.close() 
}

我已明确添加 sleep(1000) 用于测试目的。 但这也不能解决问题:(

任何建议都将不胜感激。

【问题讨论】:

  • 你的问题解决了吗?如果你做到了,请分享你的答案

标签: apache-spark apache-kafka


【解决方案1】:

因为您不想丢失任何消息,所以您可能希望选择“恰好一次”传送语义,这样不会丢失数据。为了配置一次性交付语义,您必须使用 acks='all',您已经这样做了。

根据此资源[1],acks='all' 属性必须与 min.insync.replicas 属性结合使用。

[1]https://www.linkedin.com/pulse/kafka-producer-delivery-semantics-sylvester-daniel/

【讨论】:

    【解决方案2】:

    尝试将batchDuration 参数(在初始化StreamingContext 时)调整为大于每个rdd 的处理时间的数字。这解决了我的问题。

    【讨论】:

      猜你喜欢
      • 2018-06-25
      • 2015-02-04
      • 1970-01-01
      • 2015-09-13
      • 2016-08-17
      • 1970-01-01
      • 1970-01-01
      • 2017-02-23
      • 2021-01-05
      相关资源
      最近更新 更多