【问题标题】:How to deploy storm-core topology with apache kafka integration for first time?如何首次使用 apache kafka 集成部署 Storm-core 拓扑?
【发布时间】:2018-10-01 22:10:27
【问题描述】:

我想获得有关 apachestorm 和 kafka 设置初始设置的帮助。

我能够向风暴集群提交拓扑,但在 storm ui 中出现以下错误。

Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

下面是我的代码sn-p。

// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot       = "/kafka-cluster-1/brokers/topics";
String topicName    = "myfirsttopic";

/* ****************************************************************** */
/* Topology configuration variable                                    */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt   */
Integer boltParalismHint  = 1;
Integer spoutParalismHint = 1;

/* ****************************************************************** */
/* Build kafka consumer spout                                         */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );

// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );

spoutConfig.ignoreZkOffsets = true;

// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );

// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

我引用了 document 并将 ignoreZkOffsets 设置为 true

如果你想强制 spout 忽略任何消费者状态 ZooKeeper中存储的信息,那么你应该设置参数 KafkaConfig.ignoreZkOffsets 为 true

然而,从日志来看,kafka spout 似乎正在从 Zookeeper 读取偏移量。

既然是初始设置,我怎样才能阻止来自 Zookeeper 的风暴读取偏移?

我使用以下版本。

  • apache 风暴 1.2.1
  • apache kafka kafka_2.12-1.1.0

【问题讨论】:

  • 您找到解决方案了吗?我面临同样的问题。 @Yu Watanabe

标签: apache-kafka apache-storm


【解决方案1】:

我没有做任何特别的事情,但是在以下情况下,错误似乎没有出现在 storm ui 中。

  1. Kafka 中创建主题
  2. 确保 brokerZkPath 存在于 Zookeeper 中(brokers 目录的路径。在我的例子中是 /kafka-cluster-1 /经纪人
  3. 确保 zkRootPath 存在于 Zookeepertopics 目录的路径。在我的情况下为 /kafka-cluster-1 /brokers/topics)
  4. 向storm提交拓扑

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-05-31
    • 2021-06-02
    • 1970-01-01
    • 2019-06-25
    • 2015-08-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多