【问题标题】:default consumer group id in kafkakafka中的默认消费者组ID
【发布时间】:2019-01-17 19:21:27
【问题描述】:
我正在使用 Kafka 2.11 并且对它相当陌生。我正在尝试了解 kafka 消费者群体,我有 3 个使用同一主题的 spark 应用程序,并且每个应用程序都接收来自该主题的所有消息。由于我没有在应用程序中提到任何消费者组 ID,因此我假设 Kafka 正在为每个应用程序分配一些不同的消费者组 ID。
我需要使用以下命令为其中一个应用程序重置 kafka 偏移量。由于我不知道我的应用程序的使用者组名称,所以我有点卡在这里。我是否需要在应用程序中明确分配组 ID,然后在下面的命令中使用它?
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute
如果这是真的,我怎样才能获得每个应用程序的消费者组 ID?我做不到
【问题讨论】:
标签:
apache-kafka
kafka-consumer-api
spark-structured-streaming
【解决方案1】:
消费者group.id 是强制性的。如果你不设置消费者group.id,你会得到异常。所以很明显你是在代码中的某个地方设置它,或者你正在使用的框架或库是在内部设置它。您应该始终自己设置group.id。
您可以使用以下命令获取消费者组ID:
bin/kafka-consumer-groups.sh --list --bootstrap-server <kafka-broker-ip>:9092
【解决方案2】:
如果你去Spark代码你可以找到KafkaSourceProvider这个类,它负责Kafka的源码阅读器,你可以看到生成了随机的group.id:
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
...
}
您可以使用spark-kafka-source 前缀搜索 group.id,但您无法找到特定组的 group.id。
要查找所有消费者组 ID,您可以使用以下命令:
./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list
要检查消费者组的偏移量,您可以使用以下命令:
./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe
【解决方案3】:
由于我没有在应用程序中提及任何消费者组 ID,我假设 Kafka 正在为每个应用程序分配一些不同的消费者组 ID
Kafka 代理不会将消费者组名称分配给与其连接的消费者。
当消费者连接并订阅一个主题时,它“加入”了一个组。
如果您在使用 Spark 应用程序时未指定任何使用者组,这意味着您用于从 Spark 应用程序连接到 Kafka 的库/框架在某种程度上是自己分配使用者组名称。