【问题标题】:How to reset offsets to arbitrary value in Kafka Consumer Group?如何在 Kafka Consumer Group 中将偏移量重置为任意值?
【发布时间】:2019-08-23 22:55:48
【问题描述】:

我想将所有分区的偏移量重置为特定值....我看到 kafka-consumer-groups.sh 提供了 --from-file 将偏移量重置为 CSV 文件中定义的值的选项

谁能分享这个csv文件的内容/格式和它的示例命令?

例如: ./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute

offsets.csv 的内容/格式是什么?

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    csv文件格式为(每一行包含一个分区的信息):

    topicName,partitionNumber,offset
    topicName,partitionNumber,offset
    

    示例 csv 内容 (reset-policy.csv)。

    someTopic1,0,1
    someTopic2,1,5
    

    根据 csv 文件重置偏移量的命令是:

    ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group gr1 --from-file reset-policy.csv --reset-offsets --execute

    【讨论】:

      【解决方案2】:

      @wardzinski 的answer 是请求的关键信息,但我可以添加以下有用的花絮:

      您可以使用kafka-consumer-groups--export 命令从现有信息创建CSV 文件,而无需通过--dry-run 更改任何内容。例如:

      bin/kafka-consumer-groups \
        --bootstrap-server $KAFKA \
        --export --group $GROUP_NAME --topic $TOPIC \
        --reset-offsets --to-current \
        --dry-run
      

      --to-current 的值可以更改为其他各种值,例如--to-datetime--by-period 等。

      该命令的输出是 --from-file 所需的 CSV 文件。

      一个非常有用的用例是将偏移量从一个消费者组复制到另一个消费者组,例如:

      bin/kafka-consumer-groups \
        --bootstrap-server $KAFKA \
        --export --group $FROM_GROUP_NAME --topic $TOPIC \
        --reset-offsets --to-current \
        --dry-run > offsets.txt
      
      bin/kafka-consumer-groups \
        --bootstrap-server $KAFKA \
        --execute --group $TO_GROUP_NAME \
        --reset-offsets --from-file offsets.txt
      

      【讨论】:

        【解决方案3】:

        如果您计划在 Java 应用程序中更改偏移量,您可以使用 AdminClient's API alterConsumerGroupOffsets

        这是一个使用 Kafka 2.8.0 测试的简单示例:

        String brokers = "localhost:9092";
        String consumerGroupName = "test1337";
        TopicPartition topicPartition = new TopicPartition("test", 0);
        Long offset = 4L;
        
        Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
        toOffset.put(topicPartition, new OffsetAndMetadata(offset));
        
        // Create AdminClient
        final Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        AdminClient adminClient = AdminClient.create(properties);
        
        try {
          // Check offsets before altering
          KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
          System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());
        
          // Alter offsets
          adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();
        
          // Check offsets after altering
          KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
          System.out.println("After:  " + offsetsAfterResetFuture.get().toString());
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (ExecutionException e) {
          e.printStackTrace();
        } finally {
          adminClient.close();
        }
        
        

        这将打印出来

        Before: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
        After:  {test-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}
        

        您可以扩展该示例以加载一个 csv 文件,其中包含有关消费者组、主题、分区和该分区的新偏移量的所有信息。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2018-01-22
          • 2018-10-14
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-12-28
          相关资源
          最近更新 更多