【发布时间】:2018-10-01 22:10:27
【问题描述】:
我想获得有关 apachestorm 和 kafka 设置初始设置的帮助。
我能够向风暴集群提交拓扑,但在 storm ui 中出现以下错误。
Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)
下面是我的代码sn-p。
// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot = "/kafka-cluster-1/brokers/topics";
String topicName = "myfirsttopic";
/* ****************************************************************** */
/* Topology configuration variable */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt */
Integer boltParalismHint = 1;
Integer spoutParalismHint = 1;
/* ****************************************************************** */
/* Build kafka consumer spout */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );
// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );
spoutConfig.ignoreZkOffsets = true;
// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
我引用了 document 并将 ignoreZkOffsets 设置为 true。
如果你想强制 spout 忽略任何消费者状态 ZooKeeper中存储的信息,那么你应该设置参数 KafkaConfig.ignoreZkOffsets 为 true
然而,从日志来看,kafka spout 似乎正在从 Zookeeper 读取偏移量。
既然是初始设置,我怎样才能阻止来自 Zookeeper 的风暴读取偏移?
我使用以下版本。
- apache 风暴 1.2.1
- apache kafka kafka_2.12-1.1.0
【问题讨论】:
-
您找到解决方案了吗?我面临同样的问题。 @Yu Watanabe