【发布时间】:2016-09-25 15:59:46
【问题描述】:
我正在尝试发送关于名为“test”的 kafka 主题的字数问题(在 spark-scala 中)的输出。请参阅下面的代码:
val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val lines = Dstream.map(f => f._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD(
rdd => rdd.foreach(
f =>
{
val sendProps = new Properties()
sendProps.put("metadata.broker.list", brokers)
sendProps.put("serializer.class", "kafka.serializer.StringEncoder")
sendProps.put("producer.type", "async")
val config = new ProducerConfig(sendProps)
val producer = new Producer[String, String](config)
producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2))
producer.close();
}))
问题是输出中随机缺少一些单词。我还注意到,如果我删除了该声明
producer.close()
没有数据丢失。
这是否意味着 producer.close() 在实际将数据放入缓冲区之前中断 producer.send() ?如果是,我应该如何关闭生产者而不冒数据丢失的风险?
以上是我最初的问题,Vale 的回答解决了。
现在,当我再次更改 producer.type 属性时 - 数据会随机丢失。
sendProps.put("producer.type", "sync")
为了澄清 producer.send 正在运行我需要放入输出主题的所有单词。但是,有些单词会丢失,并且不会显示在输出 Kafka 主题中。
【问题讨论】:
标签: scala apache-kafka kafka-producer-api