【问题标题】:Kafka producer creates topic but is not able to send messagesKafka 生产者创建主题但无法发送消息
【发布时间】:2016-04-21 12:26:51
【问题描述】:

我是 Scala 和 Kafka 的新手,遇到了一些麻烦。

我正在尝试将 scala kafka 生产者连接到安装在 cloudera express 服务器上的 kafka 服务器。 我已经在these instructions 的虚拟机中做过一次,没有任何问题。

当我运行生产者时,会创建所需的主题,但没有发送任何消息,或者我认为是这样。

下面是部分代码:

卡夫卡制作人

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

class KafkaProducerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("acks", "all")
    props.put("retries", "2")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("block.on.buffer.full", "true")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("auto.create.topics.enable", "true")

    val producer = new KafkaProducer[String, String](props)

   def startCounter() {
       println("Start Producer Counter")
       for (i <- 1 to 100) {
           producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
           println("Producer - Send: " + i)
       }

       println("Closing producer")
       producer.close()
   }
}

当我执行 run 方法时,我看到“Producer - Send: #”作为输出,我没有收到任何错误。 所以我假设这段代码已经将消息发送到了 Kafka。

在运行生产者之前,我在 kafka 服务器上启动了以下内容:

 kafka-console-consumer --zookeeper zk:2181 --topic test-counter

但在这里我发现什么都没有发生。

当我检查主题时,生产者应该创建的主题在列表中。

kafka-topics -zookeeper zk:2181 --list

我对消费者也有类似的问题:

import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

class KafkaConsumerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("group.id", "testGroup")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("session.timeout.ms", "3000")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)

    val consumer = new KafkaConsumer[String, String](props)

    def start() {
        println("Start Consumer")
        consumer.subscribe(Arrays.asList("test-counter"))

        while (true) {
            val records = consumer.poll(100)
            val iterator = records.iterator()

            while (iterator.hasNext) {
                val record = iterator.next()
                printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
            }
        }
    }
}

当我通过 kafka-console-producer 在服务器上创建消息时,我看到它们出现在服务器上的 kafka-console-consumer 中,但没有出现在我编写的消费者中。

kafka-console-producer --broker-list ks:9092 --topic test-counter

KafkaServer.ZOOKEEPER_ADDRESS 与 kafka-console-consumer 的参数 zk:2181 相同,KafkaServer.KAFKA_ADDRESS 与 kafka-console-producer 的参数 ks:9092 相同。

【问题讨论】:

    标签: scala cloudera-cdh kafka-consumer-api kafka-producer-api


    【解决方案1】:

    我试了一下代码,发现:

    • 应该在消费者中指定 keyvalue 反序列化器 属性:

       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      
    • session.timeout.ms 属性存在问题。来自here

      heartbeat.interval.ms - ... 必须设置为低于 session.timeout.ms ... 默认值:3000 >

      这意味着您应该增加您的session.timeout.ms 值或只是删除该行,因为默认值 属性为30000,大于默认值 heartbeat.interval.ms.

    执行更正后,代码可以正常工作。

    【讨论】:

    • 我马上试试这个
    • 通过此更改,问题仍然存在。但现在我可以看到这是由java.net.ConnectException: Connection timed out: no further information. 引起的,由于某种原因,生产者和消费者都无法连接到 ks:9092。
    • 我已经在另一台 kafka 服务器上尝试过这段代码,它工作正常。我们在 cloudera 上的 kafkaserver 和配置似乎有些问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-03
    • 2020-08-31
    • 2019-05-20
    • 1970-01-01
    相关资源
    最近更新 更多