【问题标题】:Can't access kafka from outside kubernetes无法从 kubernetes 外部访问 kafka
【发布时间】:2021-01-12 06:31:01
【问题描述】:

我正在尝试从本地机器上的 kubernetes 外部访问 kafka。我正在使用 spring 应用程序来生成有关某个主题的事件。这是我的 kafka 部署文件:

kind: Deployment
metadata:
  name: kafka-broker0
  labels:
    app: kafka
spec:
  replicas: 2
  selector:
    matchLabels:
      app: kafka
      id: "0"
  template:
    metadata:
      labels:
        app: kafka
        id: "0"
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "30718"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: 192.168.1.240
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181
        - name: KAFKA_BROKER_ID
          value: "0"
        - name: KAFKA_CREATE_TOPICS
          value: LaunchScraper:1:1

和服务文件id:

kind: Service
metadata:
  name: kafka-services
  labels:
    name: kafka
spec:
  selector:
    app: kafka
    id: "0"
  ports:
    - protocol: TCP
      name: kafka-port
      port: 9092
  type: NodePort

我已经在 kubernetes 上创建了一个 zookeeper pod。我的 Spring Boot 应用程序显示此错误:

2020-09-25 23:56:29.123  WARN 44324 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (/192.168.1.240:9092) could not be established. Broker may not be available.

【问题讨论】:

    标签: kubernetes apache-kafka


    【解决方案1】:

    您似乎没有在服务中修复 nodePort。当您达到您在 KAFKA_ADVERTISED_PORT 中输入的值时。还将 KAFKA_ADVERTISED_HOST 设置为您的 K8s 节点主机名/DNS。

    在您的服务规范中,在ports 条目下添加nodePort: 30718。然后在您的客户端中,尝试使用节点的地址或主机名连接 30718 端口

    另外,如果您希望在生产环境中部署 Kafka,我建议您使用像 Strimzi https://Strimzi.io 这样的操作符

    【讨论】:

    • 谢谢,但是如何设置 KAFKA_ADVERTISED_HOST ?我对 kafka 有一些问题,而且我是 kubernetes 的新手。我在 Windows 上使用 kubectl
    • HOST,实际上是指 HOST_NAME。如果您是 Kubernetes 新手,如果您想快速入门,我绝对建议您使用 operator。 strimzi.io/docs/operators/master/quickstart.html
    【解决方案2】:

    在 Kubernetes 上部署 Kafka 实际上并不像我最初想象的那么简单,但经过多次试验和错误后它成功了。您在 Internet 上找到的许多示例都不适用于当前版本的 Kubernetes / Kafka。有效的是:

    1. 对 Kafka 使用 StatefulSet,而不是部署
    2. 如下所示设置 KAFKA_ADVERTISED_LISTENERS 和 KAFKA_LISTENERS
    3. NodePort 服务中用于外部访问的附加端口(在我的情况下为 32092,但是任意的),不要忘记通过 32092 从外部访问 Kafka,而不是 9092

    一个可行的示例配置是(作为部署和服务的替代品,可能不是最小的):

    apiVersion: v1
    kind: Service
    metadata:
      labels:
        service: kafka
      name: kafka
    spec:
      type: NodePort
      ports:
      - name: "9092"
        port: 9092
        protocol: TCP
        targetPort: 9092
      - name: "9093"
        port: 9093
        protocol: TCP
        targetPort: 9093
      - name: "32092"
        port: 32092
        protocol: TCP
        targetPort: 32092
        nodePort: 32092
      selector:
        service: kafka-instance
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      labels:
        service: kafka-instance
      name: kafka-instance
    spec:
      selector:
        matchLabels:
          service: kafka-instance
      serviceName: "kafka"
      replicas: 1
      template:
        metadata:
          labels:
            service: kafka-instance
        spec:
          containers:
          - env:
            - name: MY_HOST_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
            - name: MY_POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: KAFKA_ADVERTISED_LISTENERS
              value: INTERNAL://$(MY_POD_NAME).kafka.default.svc.cluster.local:9093,CLIENT://$(MY_POD_NAME).kafka.default.svc.cluster.local:9092,EXTERNAL://$(MY_HOST_IP):32092
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: INTERNAL
            - name: KAFKA_LISTENERS
              value: INTERNAL://:9093,CLIENT://:9092,EXTERNAL://:32092
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
            - name: KAFKA_PORT
              value: "9092"
            - name: KAFKA_RESTART_ATTEMPTS
              value: "10"
            - name: KAFKA_RESTART_DELAY
              value: "5"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zoo1:2181
            - name: KAFKA_ZOOKEEPER_SESSION_TIMEOUT
              value: "6000"
            - name: ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL
              value: "0"
            image: wurstmeister/kafka
            name: kafka-instance
            ports:
            - containerPort: 9092
    

    如果您还没有 zookeeper,只需添加它,它应该可以工作:

    apiVersion: v1
    kind: Service
    metadata:
      labels:
        service: zoo1
      name: zoo1
    spec:
      ports:
      - name: "2181"
        port: 2181
        targetPort: 2181
      selector:
        service: zoo1-instance
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      labels:
        service: zoo1-instance
      name: zoo1-instance
    spec:
      selector:
        matchLabels:
          service: zoo1-instance
      serviceName: "zoo1"
      replicas: 1
      template:
        metadata:
          labels:
            service: zoo1-instance
        spec:
          containers:
          - image: wurstmeister/zookeeper
            name: zoo1-instance
            ports:
            - containerPort: 2181
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-02-09
      • 2020-07-02
      • 1970-01-01
      • 2021-12-28
      • 2022-09-24
      • 2017-12-24
      • 2021-12-06
      相关资源
      最近更新 更多