【问题标题】:Storm-Kafka spout not creating node in zookeeper cluster.Storm-Kafka spout 未在 zookeeper 集群中创建节点。
【发布时间】:2016-06-16 18:43:43
【问题描述】:

我使用storm-kafka 使用storm 0.10 和kafka 0.9.0.0。每当我在集群上运行我的拓扑时,它都会从头开始读取,尽管我将属性文件中的 zkRoot 和使用者 groupId 作为 -

kafka.zkHosts=myserver.myhost.com:2181
kafka.topic=onboarding-mail-topic
kafka.zkRoot=/kafka-storm
kafka.group.id=onboarding

喷口:

BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
                    String topicName = prop.getProperty("kafka.topic");
                    String zkRoot = prop.getProperty("kafka.zkRoot");
                    String groupId = prop.getProperty("kafka.group.id");

                    //kafka spout conf
                    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);

                    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

                    KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

当我检查 zookeeper ls / 时,它没有显示 kafka-storm

[controller_epoch, controller, brokers, storm, zookeeper, kafka-manager, admin, isr_change_notification, consumers, config]

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    最后,我想通了。由于从 kafka 读取和将偏移量写回 kafka 以不同的方式控制。

    如果您在 Storm 集群上运行拓扑,无论是单节点还是多节点,请确保您在storm.yaml 文件中设置了以下内容

    storm.zookeeper.servers
    

    storm.zookeeper.port
    

    除了 zkHosts 和 zkRoot 以及消费者组 id 之外的属性。

    或者最佳实践是通过在创建 KafkaSpout 时设置正确的值来覆盖拓扑中的这些属性 -

            BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
            String topicName = prop.getProperty("kafka.topic");
            String zkRoot = prop.getProperty("kafka.zkRoot");
            String groupId = prop.getProperty("kafka.group.id");
            String kafkaServers = prop.getProperty("kafka.zkServers");
            String zkPort = prop.getProperty("kafka.zkPort");
            //kafka spout conf
            SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);
    
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            kafkaConfig.zkServers = Arrays.asList(kafkaServers);
            kafkaConfig.zkPort = Integer.valueOf(zkPort);
    
            KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
    

    或者你甚至可以将这些值放在 Config 对象中。这更好,因为您可能希望将偏移量信息存储到其他 Zookeeper 集群,而不是您的拓扑从完全不同的代理读取消息

    KafkaSpout代码sn-p供理解-

     @Override
    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
        _collector = collector;
    
        Map stateConf = new HashMap(conf);
        List<String> zkServers = _spoutConfig.zkServers;
        if (zkServers == null) {
            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        }
        Integer zkPort = _spoutConfig.zkPort;
        if (zkPort == null) {
            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
        }
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
        _state = new ZkState(stateConf);
    
        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
    
        // using TransactionalState like this is a hack
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        if (_spoutConfig.hosts instanceof StaticHosts) {
            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        } else {
            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-06-24
      • 1970-01-01
      • 2018-09-19
      • 2015-04-06
      • 2017-02-01
      • 1970-01-01
      • 2020-05-01
      • 2017-11-20
      相关资源
      最近更新 更多