【发布时间】:2016-11-23 02:45:39
【问题描述】:
基于 Storm 文档,支持的 KafkaSpout 实现基于旧的消费者 API。我注意到外部包有另一个名为storm-kafka-client 的实现。
https://github.com/apache/storm/tree/master/external/storm-kafka-client
尚不清楚1.0.1 中的新客户端版本是否已准备好生产。有人有运行它的经验吗?
【问题讨论】:
基于 Storm 文档,支持的 KafkaSpout 实现基于旧的消费者 API。我注意到外部包有另一个名为storm-kafka-client 的实现。
https://github.com/apache/storm/tree/master/external/storm-kafka-client
尚不清楚1.0.1 中的新客户端版本是否已准备好生产。有人有运行它的经验吗?
【问题讨论】:
下面的代码对我有用!!!
public TopologyBuilder myTopology() {
TopologyBuilder builder = new TopologyBuilder();
try {
KafkaSpoutConfig<String, String> kafkaSpoutConfig = getKafkaSpoutConfig("KAFKA_IP:9092", KAFKA_TOPIC);
KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, 2 * 2);
builder.setBolt("Bolt-1", new TestBolt(), parallelism).shuffleGrouping("kafkaSpout", KAFKA_TOPIC);
} catch (Exception ex) {
}
return builder;
}
配置喷口。
protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers ,String topic) {
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"), topic);
Builder<String, String> builder = KafkaSpoutConfig.builder(bootstrapServers, new String[]{topic});
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, topic)
.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
.setRetry(getRetryService())
.setRecordTranslator(trans)
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setMaxUncommittedOffsets(1000)
.build();
}
用于配置失败消息重发逻辑
protected KafkaSpoutRetryService getRetryService() {
return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
}
【讨论】:
Storm 1.1.0 可以使用以下 maven 依赖项
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
您可能会遇到更多依赖问题,您可以通过添加所需的 jar 来解决这些问题。
java代码中的依赖也会从org.backtype.storm.XXXXX变成org.apache.storm.XXXXX
【讨论】:
我在 Storm 邮件列表中发布了同样的问题。 新的 API 已准备好生产。我们应该使用 1.x 分支。 我打算用
进行测试<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.0.1</version>
</dependency>
将更新进度。
【讨论】: