【发布时间】:2017-10-30 23:29:52
【问题描述】:
我在使用 Kafka 的 Spark Streaming 中遇到数据丢失的问题,我的用例如下:
- Spark 流 (DirectStream) 应用程序从 Kafka 主题和处理它。
- 根据处理后的消息,应用程序将编写 处理消息到不同的 Kafka 主题,例如如果消息 被协调,然后写到协调的主题,否则不协调 主题。
现在,问题在于,在流式传输期间,我以某种方式丢失了一些消息,即所有传入消息都没有写入协调或不协调的主题。 例如,如果应用程序在一批中收到 30 条消息,那么有时它会将所有消息写入输出主题(这是预期的行为),但有时它只写入 27 条(丢失 3 条消息,此数字可以更改)。
以下是我使用的版本:
- Spark 1.6.0
- 卡夫卡 0.9
Kafka主题配置如下:
- 经纪人数量:3
- num 复制因子:3
- 分区数:3
以下是我用于 kafka 的属性:
val props = new Properties()
props.put("metadata.broker.list", properties.getProperty("metadataBrokerList"))
props.put("auto.offset.reset", properties.getProperty("autoOffsetReset"))
props.put("group.id", properties.getProperty("group.id"))
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized"))
props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized"))
props.put("acks", "all");
props.put("retries", "5");
props.put("request.required.acks", "-1")
以下是我将处理过的消息写入 Kafka 的一段代码:
val schemaRdd2 = finalHarmonizedDF.toJSON
schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
partition.foreach { row =>
if (debug) println(row.mkString)
val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"),
null, row.toString())
producer.send(keyedMessage)
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
}
我已明确添加 sleep(1000) 用于测试目的。 但这也不能解决问题:(
任何建议都将不胜感激。
【问题讨论】:
-
你的问题解决了吗?如果你做到了,请分享你的答案