【问题标题】:Spring Kafka difference between ContainerProperties(TopicPartitionOffset... topicPartitions) and ContainerProperties(String... topics)ContainerProperties(TopicPartitionOffset ... topicPartitions)和ContainerProperties(String ...主题)之间的Spring Kafka区别
【发布时间】:2021-11-03 00:52:01
【问题描述】:

我正在使用ContainerProperties 创建一个新的KafkaMessageListenerContainer

  • 使用ContainerProperties(String... topics),消费者组看起来不错:"state": "STABLE", "isSimpleConsumerGroup": false

  • 使用ContainerProperties(TopicPartitionOffset... topicPartitions),不会自动创建消费者组。最终在发送消息时创建,但 Consumer Group 看起来不太好:"state": "EMPTY", "isSimpleConsumerGroup": true

他们之间有什么区别,我错过了什么。我希望使用两个不同的 ContainerProperties 构造函数得到相同的结果。

ContainerProperties containerProps = new ContainerProperties(tpo.toArray(new TopicPartitionOffset[tpo.size()]));
containerProps.setGroupId(name);

// ContainerProperties containerProps = new ContainerProperties("poc-topic1",
// "poc-topic2", "poc-topic3");
// containerProps.setGroupId(name);

containerProps.setMessageListener(new TopicMessageListener(name));

DefaultKafkaConsumerFactory<String, Serializable> factory = new DefaultKafkaConsumerFactory<>(
                Utils.get().getConsumerProperties());
container = new KafkaMessageListenerContainer<>(factory, containerProps);
// container.setAutoStartup(true);
// container.setBeanName(name);
// container.checkGroupId();

container.start();

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    这是不正确的。主题订阅导致消费者组及其分区在组成员之间分布。

    当您进行显式分区分配时,根本不涉及消费者组。

    在 Apache Kafka 文档中查看更多信息:https://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups

    以及相应的 JavaDocs:

    /**
     * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
     * and will replace the previous assignment (if there is one).
     * <p>
     * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
     * <p>
     * Manual topic assignment through this method does not use the consumer's group management
     * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
     * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
     * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
     * <p>
     * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
     * assignment replaces the old one.
     *
     * @param partitions The list of partitions to assign this consumer
     * @throws IllegalArgumentException If partitions is null or contains null or empty topics
     * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
     *                               (without a subsequent call to {@link #unsubscribe()})
     */
    @Override
    public void assign(Collection<TopicPartition> partitions) {
    

    和:

    /**
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
     * with manual partition assignment through {@link #assign(Collection)}.
     *
     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which
     * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
     * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
     * to be reset. You should also provide your own listener if you are doing your own offset
     * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
     *
     * @param topics The list of topics to subscribe to
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
     *                               configured at-least one partition assignment strategy
     */
    @Override
    public void subscribe(Collection<String> topics) {
    

    【讨论】:

    • 好的,谢谢我现在看到了不同之处,我的目标是创建一个新的容器,它将订阅一个主题并分配给一个特定的偏移量(我的主题只有一个分区)。如果不使用KafkaConsumerseek() 方法,这可能吗?
    • 那不是。只要您加入现有组,您只需从存储的偏移量开始。要找到特定的,你真的必须seek()
    猜你喜欢
    • 2019-04-29
    • 1970-01-01
    • 1970-01-01
    • 2019-11-29
    • 2013-06-25
    • 1970-01-01
    • 1970-01-01
    • 2012-03-04
    • 2020-07-25
    相关资源
    最近更新 更多