【问题标题】:Kafka consumer doesn't consume messages from existing topicKafka 消费者不使用来自现有主题的消息
【发布时间】:2020-12-12 04:38:36
【问题描述】:

我在 docker 上安装了 confluent kafka。在主题中,我有 10 个分区。问题是我无法使用来自该主题的消息,但我可以在该主题中生成消息。我正在尝试使用 C# confluent.kafka driver 1.5.1 (latest) 和 librd.kafka 1.5.0 (latest) 从主题中消费。

我启动 kafka 的 docker-compose 文件如下

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    networks:
      - bridge_network
    ports:
      - "3001:3001"    
    environment:
      ZOOKEEPER_CLIENT_PORT: 3001
      ZOOKEEPER_TICK_TIME: 3000

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
 
  kafka_manager:
    image: sheepkiller/kafka-manager
    hostname: kafka_manager
    depends_on:
      - zookeeper
    ports:
      - '9000:9000'
    networks:
      - bridge_network
    environment:
      ZK_HOSTS: 'zookeeper:3001'
networks:
  bridge_network:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"

我在 C# 中的使用者配置如下:

            var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
            {
                { "bootstrap.servers", "PLAINTEXT://localhost:3002" },
                { "group.id", "some-test-group" },
                { "auto.offset.reset", "latest"},
                { "compression.codec", "gzip" },
                { "enable.auto.commit", "false" }
            }).Build();

            consumer.Subscribe("some-test-topic");

            while (true)
            {
                var cr = consumer.Consume(30_000);
                if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
                {
                    System.Console.WriteLine("that's it");
                    break;
                }

                System.Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
            }

我确定主题分区中有消息,因为我可以使用 kafka 工具 2.0 检查主题

我用于kafka工具的配置是

我很确定我错过了配置文件中的某些内容,但是在阅读了 2 天的文档并将我的头撞到墙上后,我仍然找不到问题。那么有人可以帮忙吗?

【问题讨论】:

  • 能否附上您的 kafka 消费者日志?

标签: c# apache-kafka confluent-kafka-dotnet


【解决方案1】:

问题在于代理和主题复制因素。我使用你的 docker-compose 文件来部署 kafka,我连接到查看日志并且有消息:

ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

为了解决这个问题,我必须为代理配置添加 `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1'。所以我的代理服务配置如下所示:

broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

重新启动代理后,我能够生成/使用消息。

【讨论】:

    【解决方案2】:

    您需要将auto.offset.reset 设置为“最早”或在您的消费者运行时向该主题生成消息。

    【讨论】:

    • 我已将 auto.offset.reset 更改为最早但问题仍然存在
    • 如果你使用'latest',你只会消费消费者启动后产生的消息。
    猜你喜欢
    • 2021-07-09
    • 2018-07-21
    • 2022-10-23
    • 1970-01-01
    • 1970-01-01
    • 2019-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多