【问题标题】:default consumer group id in kafkakafka中的默认消费者组ID
【发布时间】:2019-01-17 19:21:27
【问题描述】:

我正在使用 Kafka 2.11 并且对它相当陌生。我正在尝试了解 kafka 消费者群体,我有 3 个使用同一主题的 spark 应用程序,并且每个应用程序都接收来自该主题的所有消息。由于我没有在应用程序中提到任何消费者组 ID,因此我假设 Kafka 正在为每个应用程序分配一些不同的消费者组 ID。 我需要使用以下命令为其中一个应用程序重置 kafka 偏移量。由于我不知道我的应用程序的使用者组名称,所以我有点卡在这里。我是否需要在应用程序中明确分配组 ID,然后在下面的命令中使用它?

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute

如果这是真的,我怎样才能获得每个应用程序的消费者组 ID?我做不到

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spark-structured-streaming


    【解决方案1】:

    消费者group.id 是强制性的。如果你不设置消费者group.id,你会得到异常。所以很明显你是在代码中的某个地方设置它,或者你正在使用的框架或库是在内部设置它。您应该始终自己设置group.id

    您可以使用以下命令获取消费者组ID:

    bin/kafka-consumer-groups.sh  --list --bootstrap-server <kafka-broker-ip>:9092
    

    【讨论】:

      【解决方案2】:

      如果你去Spark代码你可以找到KafkaSourceProvider这个类,它负责Kafka的源码阅读器,你可以看到生成了随机的group.id:

      private[kafka010] class KafkaSourceProvider extends DataSourceRegister
      
        override def createSource(
          sqlContext: SQLContext,
          metadataPath: String,
          schema: Option[StructType],
          providerName: String,
          parameters: Map[String, String]): Source = {
            validateStreamOptions(parameters)
            // Each running query should use its own group id. Otherwise, the query may be only assigned
            // partial data since Kafka will assign partitions to multiple consumers having the same group
            // id. Hence, we should generate a unique id for each query.
            val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
          ...
        }
      

      您可以使用spark-kafka-source 前缀搜索 group.id,但您无法找到特定组的 group.id。

      要查找所有消费者组 ID,您可以使用以下命令: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list

      要检查消费者组的偏移量,您可以使用以下命令: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe

      【讨论】:

        【解决方案3】:

        由于我没有在应用程序中提及任何消费者组 ID,我假设 Kafka 正在为每个应用程序分配一些不同的消费者组 ID

        Kafka 代理不会将消费者组名称分配给与其连接的消费者。 当消费者连接并订阅一个主题时,它“加入”了一个组。 如果您在使用 Spark 应用程序时未指定任何使用者组,这意味着您用于从 Spark 应用程序连接到 Kafka 的库/框架在某种程度上是自己分配使用者组名称。

        【讨论】:

          猜你喜欢
          • 2017-08-24
          • 1970-01-01
          • 2017-06-19
          • 2020-09-19
          • 1970-01-01
          • 1970-01-01
          • 2021-01-16
          • 2017-01-04
          • 2016-12-03
          相关资源
          最近更新 更多