【问题标题】:Getting Bootstrap broker ip:9092 disconnected error from kafka spout从 kafka spout 获取 Bootstrap 代理 ip:9092 断开连接错误
【发布时间】:2018-12-06 18:06:58
【问题描述】:

版本:

"org.apache.storm" % "storm-kafka-client" % "1.2.1"
"org.apache.storm" % "storm-core" % "1.2.1" % "compile"
Kafka: 0.10.1.0 

我收到以下错误/警告,在 localCluster 中运行,来自我的 kafka spout:

2018-06-28 00:00:34,930 AppInfoParser [INFO] Kafka version : 0.10.1.0
2018-06-28 00:00:34,930 AppInfoParser [INFO] Kafka commitId : 3402a74efb23d1d4
2018-06-28 00:00:34,931 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip1:9092 disconnected
2018-06-28 00:00:35,092 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip2:9092 disconnected
2018-06-28 00:00:35,251 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip3:9092 disconnected
2018-06-28 00:00:35,524 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip4:9092 disconnected
2018-06-28 00:00:35,629 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip5:9092 disconnected
2018-06-28 00:00:35,822 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip6:9092 disconnected
2018-06-28 00:00:35,927 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip7:9092 disconnected

这里是Kafka Spout的代码:

  private def getKafkaSpoutConfig(source: TopologyConfig) = {
    System.clearProperty("java.security.auth.login.config")  //tried this after getting error, no impact
    KafkaSpoutConfig.builder("ip1:9092,ip2:9092,ip3:9092,.....,ip10:9092", "topicName")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)        
    .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
    .setMaxUncommittedOffsets(1000000)
    .build()
  }
  def getKafkaSpout(source: TopologyConfig) = new KafkaSpout(getKafkaSpoutConfig(source: TopologyConfig))

在调试时,我看到以下是此错误的堆栈跟踪:

maybeHandleDisconnection:568, NetworkClient$DefaultMetadataUpdater (org.apache.kafka.clients)
processDisconnection:396, NetworkClient (org.apache.kafka.clients)
handleDisconnections:464, NetworkClient (org.apache.kafka.clients)
poll:270, NetworkClient (org.apache.kafka.clients)
poll:232, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
poll:195, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
getTopicMetadata:253, Fetcher (org.apache.kafka.clients.consumer.internals)
partitionsFor:1318, KafkaConsumer (org.apache.kafka.clients.consumer)
getFilteredTopicPartitions:57, NamedTopicFilter (org.apache.storm.kafka.spout)
refreshAssignment:54, ManualPartitionSubscription (org.apache.storm.kafka.spout)
subscribe:49, ManualPartitionSubscription (org.apache.storm.kafka.spout)
subscribeKafkaConsumer:657, KafkaSpout (org.apache.storm.kafka.spout)
activate:648, KafkaSpout (org.apache.storm.kafka.spout)
invoke:484, util$async_loop$fn__557 (org.apache.storm)
run:22, AFn (clojure.lang)
run:748, Thread (java.lang)

相同的代码适用于一个 Kafka 设置,但对于另一个相同版本的 kafka 设置,它开始出现上述错误。

编辑:

作为par cmets,我尝试连接到Kafka的9092端口,我能够做到:

➜  git:(myBranch) ✗ telnet ipn 9092
Trying ipn...
Connected to my-kafka-app-396433.
Escape character is '^]'.

【问题讨论】:

  • 您能否证明您可以从您的 Storm 机器访问 Kafka 的 9092 端口?
  • @cricket_007 我可以连接,请参阅已编辑的问题。我正在尝试从我的 Mac 通过本地模式运行这场风暴。
  • 能否启用 TRACE 日志记录,看看能否获得更多信息?
  • @cricket_007 我已经有 conf.setDebug(true) 用于 Storm 还是您还有什么意思?
  • 我不知道 Storm API,但我认为您在某个级别应用了 log4j 配置。

标签: java scala apache-kafka apache-storm


【解决方案1】:

这是由于 kafka 的某些版本不匹配而发生的。安装的 kafka 版本为 0.10.0.1,而代码正在使用 kafka-clients 版本选择和执行:0.10.1.0

发生这种情况是因为 storm-core 具有 kafka-clients 版本的依赖项:0.10.1.0,它可以被覆盖,我这样做了,但不知何故它没有在 sbt 中正确排除。经过一些排列后,它正在工作,最终的依赖项看起来像这样:

Seq("org.apache.storm" % "storm-core" % "1.2.1" % "compile" excludeAll(
  ExclusionRule(organization = "org.apache.logging.log4j"),
  ExclusionRule(organization = "org.apache.kafka", artifact = "kafka-clients"),
  ExclusionRule(organization = "ring-cors"),
  ExclusionRule(organization = "org.slf4j", artifact = "*"),
  ExclusionRule(organization = "log4j", artifact = "*"),
  ExclusionRule(organization = "javax", artifact = "*"),
  ExclusionRule(organization = "javax.servlet", artifact="*")
)),
"org.apache.kafka" % "kafka-clients" % "0.10.0.1" excludeAll(
  ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
  ExclusionRule(organization = "log4j"),
  ExclusionRule(organization = "javax.servlet", artifact="*")
),
"org.apache.storm" % "storm-kafka-client" % "1.2.1" excludeAll(
  ExclusionRule(organization = "org.apache.kafka", artifact = "*"),
  ExclusionRule(organization = "org.apache.logging.log4j"),
  ExclusionRule(organization = "ring-cors"),
  ExclusionRule(organization = "org.apache.logging.log4j", artifact = "*"),
  ExclusionRule(organization = "org.slf4j", artifact = "*"),
  ExclusionRule(organization = "log4j", artifact = "*")
),

【讨论】:

    猜你喜欢
    • 2018-02-16
    • 2020-07-18
    • 1970-01-01
    • 2021-06-23
    • 1970-01-01
    • 1970-01-01
    • 2020-05-04
    • 1970-01-01
    • 2015-09-03
    相关资源
    最近更新 更多