【问题标题】:spring-kafka - how to read one topic from the beginning, while reading another one from the end?spring-kafka - 如何从头开始阅读一个主题,同时从头开始阅读另一个主题?
【发布时间】:2017-03-14 02:47:29
【问题描述】:

我正在写一个 spring-kafka 应用程序,其中我需要阅读 2 个主题:test1 和 test2:

public class Receiver {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Receiver.class);

    @KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
  @TopicPartition(topic = "test2", partitions = { "0" })})
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}

我的配置如下所示:

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        return props;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

我需要能够仅读取“test1”中的最新消息,同时能够读取“test2”开头的所有消息。 我只对应用启动时的“test2”消息感兴趣,但只要应用正在运行,就需要不断读取“test1”消息。

有没有办法配置这样的功能?

【问题讨论】:

  • 您不能将ConsumerConfig.GROUP_ID_CONFIG 设置为两个值;第二个只会覆盖地图中的第一个。

标签: spring-boot spring-kafka


【解决方案1】:

我也一直在努力解决这个问题,并想提出一个更通用的解决方案。

虽然您的解决方案有效,但您需要对分区进行硬编码。 您还可以让使用@KafkaListener-annotation 的类实现ConsumerSeekAware 接口。

这为您提供了三种可用于寻找特定偏移量的方法。一旦分配了分区,就会调用一种方法。所以它可能看起来像这样。

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    assignments.keySet().stream()
        .filter(partition -> "MYTOPIC".equals(partition.topic()))
        .forEach(partition -> callback.seekToBeginning("MYTOPIC", partition.partition()));
}

这样,当您决定向主题添加更多分区时,您无需修改​​任何代码 :)

希望这对某人有所帮助。

【讨论】:

  • 我遇到的最佳答案。优雅且非常简单。这应该被标记为正确答案
【解决方案2】:

这是一种对我有用的方法:

@KafkaListener(id = "receiver-api",         
        topicPartitions =
        { @TopicPartition(topic = "schema.topic", 
                partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
                @TopicPartition(topic = "data.topic", partitions = { "0" })})
    public void receiveMessage(String message) {
        try {
                JSONObject incomingJsonObject = new JSONObject(message); 
                if(!incomingJsonObject.isNull("data")){
                    handleSchemaMessage(incomingJsonObject);
                }

                else {
                    handleDataMessage(incomingJsonObject);
                }

        } catch (Exception e) {
            e.printStackTrace();
        }

使用“partitionOffsets”注解(import org.springframework.kafka.annotation.PartitionOffset;)

是能够始终从头开始阅读特定主题,同时像往常一样“跟踪”其他主题的关键。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-12
    • 2018-09-24
    • 2011-02-26
    • 2019-05-02
    • 1970-01-01
    • 2017-04-11
    • 2017-12-18
    • 1970-01-01
    相关资源
    最近更新 更多