【问题标题】:Kafka producer.send() is Stopped by producer.close()Kafka producer.send() 被 producer.close() 停止
【发布时间】:2016-09-25 15:59:46
【问题描述】:

我正在尝试发送关于名为“test”的 kafka 主题的字数问题(在 spark-scala 中)的输出。请参阅下面的代码:

val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

val lines = Dstream.map(f => f._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.foreachRDD(
      rdd => rdd.foreach(
        f =>
          {
            val sendProps = new Properties()
            sendProps.put("metadata.broker.list", brokers)
            sendProps.put("serializer.class", "kafka.serializer.StringEncoder")
            sendProps.put("producer.type", "async")

            val config = new ProducerConfig(sendProps)
            val producer = new Producer[String, String](config)
            producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2))
            producer.close();

          })) 

问题是输出中随机缺少一些单词。我还注意到,如果我删除了该声明

producer.close()

没有数据丢失。

这是否意味着 producer.close() 在实际将数据放入缓冲区之前中断 producer.send() ?如果是,我应该如何关闭生产者而不冒数据丢失的风险?

以上是我最初的问题,Vale 的回答解决了。

现在,当我再次更改 producer.type 属性时 - 数据会随机丢失。

sendProps.put("producer.type", "sync")

为了澄清 producer.send 正在运行我需要放入输出主题的所有单词。但是,有些单词会丢失,并且不会显示在输出 Kafka 主题中。

【问题讨论】:

    标签: scala apache-kafka kafka-producer-api


    【解决方案1】:

    这很奇怪。 close() 方法应该等待发送完成,这就是引入 close(time) 方法的原因:as you can see here.
    所以,我使用 Java 7。 rdd.foreach 是否在其中的每个分区上运行?还是它在每个元组上运行(就像我认为的那样)?
    如果是后者,你可以试试 rdd.foreachPartition (refer to this) 吗?因为您正在为您拍摄的每一行创建一个生产者,我担心这可能会导致问题(尽管理论上它不应该)。

    【讨论】:

    • 我实际上是从 kafka 流中读取数据。是的,正在为每个元组创建一个新的生产者。请告诉我你看到这个后要尝试什么。
    • 我知道您正在使用 DStreams。我还不能用scala写。如果你看我的参考这个链接,有一个“如何使用 foreachRDD”,它建议你这种结构:dstream.foreachRDD |-rdd.foreachPartition |-create new producer, get stuff from partition and send it
    • 谢谢淡水河谷。该解决方案运行良好。我已经敲了一天多的头。你救了我的灵魂。
    • @CodeSanta 有用吗?编辑:很高兴知道!这实际上是我的第一个答案 =)
    • 虽然程序现在在运行并选择已经存在于该主题中的数据时工作正常;当我在作业运行时尝试将数据写入 kafka 主题(从我接受输入的地方)时,我仍然面临问题。知道为什么会这样吗?
    猜你喜欢
    • 2023-04-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-05
    • 1970-01-01
    • 1970-01-01
    • 2019-02-11
    • 1970-01-01
    相关资源
    最近更新 更多