【问题标题】:Kafka consumer in ScalaScala 中的 Kafka 消费者
【发布时间】:2018-10-21 00:02:37
【问题描述】:

我正在尝试使用 Scala 编写 Kafka 消费者代码。

我有一个 Kafka 集群(服务器 A),其中一些消息已经发布到主题(测试主题),我正在从另一台服务器(比如 B)使用它,该服务器建立了从服务器 B 到 A 的连接。(我我正在使用 spark-shell 进行编码)。

下面是我试图在服务器 B 中执行的 sn-p,但是在执行代码时出现空指针异常。

import java.util._
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition
val  props = new Properties()
props.put("group.id","group-test")
props.put("auto.commit.interval.ms","1000")
props.put("auto.offset.reset","earliest")
props.put("bootstrap.servers", "<kafka server ip>:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("partition.assignment.strategy","org.apache.kafka.clients.consumer.RangeAssignor")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe("test-topic")
 while(true){
   val records=consumer.poll(100)
   println(consumer)
   println(records)
   for (record<-records.asScala){
    println(record)
   }
 }



Error --
org.apache.kafka.clients.consumer.KafkaConsumer@29451e22
null
java.lang.NullPointerException
  at scala.collection.convert.Wrappers$JMapWrapperLike$$anon$2.<init>(Wrappers.scala:275)
  at scala.collection.convert.Wrappers$JMapWrapperLike$class.iterator(Wrappers.scala:274)
  at scala.collection.convert.Wrappers$JMapWrapper.iterator(Wrappers.scala:292)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  ... 59 elided

请帮助解决问题。

【问题讨论】:

    标签: scala nullpointerexception apache-kafka kafka-consumer-api


    【解决方案1】:

    consumer.poll(100) 返回 null。

    在循环之前增加超时并测试记录是否 != null。

    【讨论】:

    • 我以为 poll 只是返回了一个空迭代器,而不是 null?
    • @ZiadM 我已经像下面这样增加了超时,但它没有解决问题,val records=consumer.poll(5000) 记录:java.util.Map[String,org.apache.kafka. clients.consumer.ConsumerRecords[String,String]] = null
    • 你需要检查你的主题是否有数据,试试控制台消费者,检查它是否流数据
    • @ZiadM 我已经检查了我的主题,其中有一些测试数据 "sample 1" , "sample 2" ...我还在 /kafka-console-consumer.sh 中收到了新消息发布在生产者中。
    猜你喜欢
    • 1970-01-01
    • 2021-10-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-19
    • 2017-01-04
    • 1970-01-01
    相关资源
    最近更新 更多