【问题标题】:Kafka Spark Stream throws Exception:No current assignment for partitionKafka Spark Stream 抛出异常:分区没有当前分配
【发布时间】:2017-06-28 15:15:20
【问题描述】:

以下是我创建 spark kafka 流的 scala 代码:

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "server110:2181,server110:9092",
"zookeeper" -> "server110:2181",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "example",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("ABTest")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

但是运行 10 小时后,它会抛出异常:

2017-02-10 10:56:20,000 INFO [JobGenerator] internals.ConsumerCoordinator: **Revoking previously assigned partitions** [ABTest-0, ABTest-1] for group example
2017-02-10 10:56:20,000 INFO [JobGenerator] internals.AbstractCoordinator: (Re-)joining group example
2017-02-10 10:56:20,011 INFO [JobGenerator] internals.AbstractCoordinator: (Re-)joining group example
2017-02-10 10:56:40,057 INFO [JobGenerator] internals.AbstractCoordinator: Successfully joined group example with generation 5
2017-02-10 10:56:40,058 INFO [JobGenerator] internals.ConsumerCoordinator: **Setting newly assigned partitions** [ABTest-1] for group example
2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error generating jobs for time 1486695380000 ms
java.lang.IllegalStateException: No current assignment for partition ABTest-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

显然,这个消费者的分区 ABTestMsg-0 已经被撤销了,但是 Spark Streaming 消费者似乎没有意识到这一点,并继续消费这个被撤销的 topic-partition 的数据,所以发生了异常并且总火花作业中止。 我觉得kafka rebalance事件很正常,如何修改我的代码让Spark streaming正确处理partition-revoke事件?

【问题讨论】:

  • 在我的情况下,我使用 ZKClient org.I0Itec.zkclient.ZkClient client = new ZkClient("servername:2181") List topicsList = JavaConversions.asJavaList(ZkUtils .getAllTopics(客户端));谢谢

标签: apache-kafka spark-streaming rdd


【解决方案1】:

我花了一段时间才弄清楚这一点。这是因为重新平衡而发生的。使用 Assign 而不是订阅 ConsumerStrategy。

【讨论】:

  • 为主题分配需要偏移值。当使用 Assign[String, String](topics, kafkaParams) 而不是 Subscribe[String, String](topics, kafkaParams) 时,它会抛出编译错误,说没有找到重载方法
  • 很好的答案。谢谢。
猜你喜欢
  • 2019-08-14
  • 1970-01-01
  • 1970-01-01
  • 2018-09-02
  • 1970-01-01
  • 2022-01-01
  • 2017-09-05
  • 2017-01-12
  • 1970-01-01
相关资源
最近更新 更多