【问题标题】:Kafka producer hangs on sendKafka 生产者挂起发送
【发布时间】:2018-04-19 02:11:17
【问题描述】:

逻辑是,从自定义源获取数据的流式作业必须同时写入 Kafka 和 HDFS。

我编写了一个(非常)基本的 Kafka 生产者来执行此操作,但是整个流式传输作业都挂在 send 方法上。

class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: String, val sslCertificatePassword: String) {

  val kafkaProps: Properties = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
  kafkaProps.put("acks", "1")
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProps.put("ssl.truststore.location", sslCertificatePath)
  kafkaProps.put("ssl.truststore.password", sslCertificatePassword)

  val kafkaProducer: KafkaProducer[Long, Array[String]] = new KafkaProducer(kafkaProps)

  def sendKafkaMessage(message: Message): Unit = {
    message.data.foreach(list => {
      val producerRecord: ProducerRecord[Long, Array[String]] = new ProducerRecord[Long, Array[String]](kafkaTopic, message.timeStamp.getTime, list.toArray)
      kafkaProducer.send(producerRecord)
    })
  }
}

以及调用生产者的代码:

receiverStream.foreachRDD(rdd => {
      val messageRowRDD: RDD[Row] = rdd.mapPartitions(partition => {
        val parser: Parser = new Parser
        val kafkaProducer: KafkaProducer = new KafkaProducer(kafkaBootstrapServers, kafkaTopic, kafkaSslCertificatePath, kafkaSslCertificatePass)
        val newPartition = partition.map(message => {
          Logger.getLogger("importer").error("Writing Message to Kafka...")
          kafkaProducer.sendKafkaMessage(message)
          Logger.getLogger("importer").error("Finished writing Message to Kafka")
          Message.data.map(singleMessage => parser.parseMessage(Message.timeStamp.getTime, singleMessage))
        })
        newPartition.flatten
      })

      val df = sqlContext.createDataFrame(messageRowRDD, Schema.messageSchema)

      Logger.getLogger("importer").info("Entries-count: " + df.count())
      val row = Try(df.first)

      row match {
        case Success(s) => Persister.writeDataframeToDisk(df, outputFolder)
        case Failure(e) => Logger.getLogger("importer").warn("Resulting DataFrame is empty. Nothing can be written")
      }
    })

从日志中我可以看出每个执行者都达到了“发送到 kafka”的点,但没有进一步。所有的执行者都挂在上面,没有抛出异常。

Message 类是一个非常简单的案例类,有 2 个字段、一个时间戳和一个字符串数组。

【问题讨论】:

    标签: scala hadoop apache-spark apache-kafka kafka-producer-api


    【解决方案1】:

    这是由于 Kafka 中的 acks 字段造成的。

    Acks 设置为 1,发送速度更快。

    【讨论】:

    • 您能否详细说明上述答案...在哪里?以及如何详细解决。我面临着同样的问题,我的测试试图向主题发送消息并且它挂在发送等待确认!
    猜你喜欢
    • 1970-01-01
    • 2016-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-03
    • 1970-01-01
    • 1970-01-01
    • 2021-01-09
    相关资源
    最近更新 更多