【问题标题】:Storm-Kafka-client from Storm 1.0.1来自 Storm 1.0.1 的 Storm-Kafka-client
【发布时间】:2016-11-23 02:45:39
【问题描述】:

基于 Storm 文档,支持的 KafkaSpout 实现基于旧的消费者 API。我注意到外部包有另一个名为storm-kafka-client 的实现。

https://github.com/apache/storm/tree/master/external/storm-kafka-client

尚不清楚1.0.1 中的新客户端版本是否已准备好生产。有人有运行它的经验吗?

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    下面的代码对我有用!!!

    public TopologyBuilder myTopology() {
    
        TopologyBuilder builder = new TopologyBuilder();
    
        try {       
            KafkaSpoutConfig<String, String> kafkaSpoutConfig = getKafkaSpoutConfig("KAFKA_IP:9092", KAFKA_TOPIC);          
            KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
            builder.setSpout("kafkaSpout", kafkaSpout, 2 * 2);
            builder.setBolt("Bolt-1", new TestBolt(), parallelism).shuffleGrouping("kafkaSpout", KAFKA_TOPIC);
        } catch (Exception ex) {
    
        }
    
        return builder;
    }
    

    配置喷口。

    protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers ,String topic) {
        ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
            new Fields("topic", "partition", "offset", "key", "value"), topic);
    
        Builder<String, String> builder = KafkaSpoutConfig.builder(bootstrapServers, new String[]{topic});
    
        return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, topic)
            .setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
            .setRetry(getRetryService())
            .setRecordTranslator(trans)
            .setOffsetCommitPeriodMs(10_000)
            .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
            .setMaxUncommittedOffsets(1000)
            .build();
    }
    

    用于配置失败消息重发逻辑

    protected KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
     }
    

    【讨论】:

      【解决方案2】:

      Storm 1.1.0 可以使用以下 maven 依赖项

        <dependency>
              <groupId>org.apache.storm</groupId>
              <artifactId>storm-core</artifactId>
               <version>1.1.0</version>
              <scope>provided</scope>
              <exclusions>
                  <exclusion>
                      <groupId>org.slf4j</groupId>
                      <artifactId>log4j-over-slf4j</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>
      
       <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>0.10.0.0</version>
          </dependency>
          <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>storm-kafka</artifactId>
          <version>1.0.0</version>
      </dependency>
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka_2.10</artifactId>
               <version>0.9.0.0</version>
              <exclusions>
                  <exclusion>
                      <groupId>org.apache.zookeeper</groupId>
                      <artifactId>zookeeper</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>log4j</groupId>
                      <artifactId>log4j</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>
      

      您可能会遇到更多依赖问题,您可以通过添加所需的 jar 来解决这些问题。

      java代码中的依赖也会从org.backtype.storm.XXXXX变成org.apache.storm.XXXXX

      【讨论】:

        【解决方案3】:

        我在 Storm 邮件列表中发布了同样的问题。 新的 API 已准备好生产。我们应该使用 1.x 分支。 我打算用

        进行测试
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
        
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.0.1</version>
        </dependency>
        

        将更新进度。

        【讨论】:

        • 嗨@YaRiK,我正在寻找有关如何使用storm-kafka-client模块的快速示例!如果您已经编写了一个示例代码,您能否分享任何示例代码?谢谢
        猜你喜欢
        • 2015-03-13
        • 2019-09-04
        • 2015-05-20
        • 2019-02-08
        • 2018-11-03
        • 2015-10-14
        • 2019-07-28
        • 1970-01-01
        • 2016-11-16
        相关资源
        最近更新 更多