【问题标题】:kafka broker connection failed when create new topic with a cluster of 3 brokers使用 3 个代理的集群创建新主题时,kafka 代理连接失败
【发布时间】:2022-01-17 13:51:29
【问题描述】:

我正在尝试在 Docker 上设置一个包含 3 个代理的 kafka 集群。

问题是:当我进行操作(即创建/列出/删除主题)时,总是有 1 个代理无法连接并重新启动 Docker 容器。在 2 个或单个代理的集群上不会发生此问题。

我的重现步骤是:

  • 运行 docker-compose up
  • 打开1个kafka容器的shell并创建主题kafka-topics --bootstrap-server ":9092" --create --topic topic-name --partitions 3 --replication-factor 3
  • 此后,1 个随机代理断开连接并从集群中删除。有时上述执行的响应是错误表示复制因子不能大于 2(因为 1 个代理已从集群中删除)

我是卡夫卡的新手。我想我只是有一些愚蠢的错误,但我不知道它是什么。我搜索了文档,但还没有找到。

这是我的 docker-compose 文件:

version: "3.9"

networks:
  kafka-cluster:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      # ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      # ZOOKEEPER_TICK_TIME: 2000
      # ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
    ports:
      - 2181:2181
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka1:9092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka2:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - "9095:9095"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9095
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka3:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      - KAFKA_BROKERCONNECT=kafka1:9092,kafka2:9092,kafka3:9092
      - JVM_OPTS="-Xms32M -Xmx64M"
      - SERVER_SERVLET_CONTEXTPATH="/"
    depends_on:
      - kafka1
    networks:
      - kafka-cluster

这是其他 2 个代理的错误日志:

[2022-01-17 04:32:40,078] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1002, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test-topic-3-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), test-topic-2-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=28449961, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2022-01-17 04:32:42,088] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Connection to node 1001 (kafka1/192.168.48.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-01-17 04:32:42,088] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=28449961, epoch=INITIAL) to node 1001: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

【问题讨论】:

  • Kafka 新手 - 那么为什么需要在 one 机器上使用它们的集群?

标签: docker apache-kafka docker-compose


【解决方案1】:

假设您不需要主机连接(因为您直接在容器中运行 Kafka CLI 命令),您可以大大简化您的 Compose 文件

  1. 删除主机端口
  2. 移除非CLIENT 监听器,并坚持使用默认值。
  3. 移除 Compose 网络(用于调试),因为它是自动创建的

总而言之,你最终会得到这样的结果

x-kafka-setup: &kafka-setup
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  ALLOW_PLAINTEXT_LISTENER: 'yes'

version: "3.8"
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka1:
    image: &broker-image docker.io/bitnami/kafka:3
    environment:
      KAFKA_BROKER_ID: 1
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka2:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 2
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka3:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 3
      <<: *kafka-setup
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: kafka1:9092,kafka2:9092,kafka3:9092
      JVM_OPTS: "-Xms32M -Xmx64M"
      SERVER_SERVLET_CONTEXTPATH: /
    depends_on:
      - kafka1
      - kafka2
      - kafka3

【讨论】:

  • 我在想 3 个代理占用了太多的内存/cpu,以至于 Docker 终止了容器。 Anw,感谢您的简化配置,我可以从中学到一些东西。另外,我采纳了您上面的建议,从 1 节点集群开始。
  • 您可能内存不足。在这种情况下,我认为运行 docker ps 会报告退出代码 137
猜你喜欢
  • 2019-04-04
  • 2016-12-10
  • 2021-02-08
  • 1970-01-01
  • 2021-08-12
  • 1970-01-01
  • 1970-01-01
  • 2019-01-09
  • 1970-01-01
相关资源
最近更新 更多