【发布时间】:2018-12-21 17:31:23
【问题描述】:
我正在尝试在不使用 @Kafkalistener 的情况下编写 kafka 消费者,以下是我用于配置侦听器的代码行:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "org");
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ContainerProperties containerProperties=new ContainerProperties("in.t");
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Consumer receiver() {
return new Consumer();
}
}
这里如何配置主题和监听方法,我的消费者类可以有多个方法。
另外,想知道在将 @kafkalistener 与 kafka 流一起使用时是否存在任何潜在问题。
PS:我不想使用@KafkaListener。
【问题讨论】:
-
为什么不使用 kafka 监听器? @用户
-
因为它不适用于流
标签: spring-boot apache-kafka kafka-consumer-api spring-kafka