【问题标题】:Running simple kafka instance on kubernetes在 kubernetes 上运行简单的 kafka 实例
【发布时间】:2020-12-19 12:38:30
【问题描述】:

我是 Kubernetes 新手,正在尝试在单节点 minikube 集群上部署单个副本 kafka 实例。

这里是zookeeper服务/部署yml

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster
  labels:
    component: zookeeper
spec:
  ports:
  - name: "2181"
    port: 2181
    targetPort: 2181
  selector:
    component: zookeeper
status:
  loadBalancer: {}

---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      component: zookeeper      
  template:
    metadata:
      labels:
        component: zookeeper
    spec:
      containers:
      - image: zookeeper:3.4.13
        name: zookeeper
        ports:
        - containerPort: 2181
        resources:
          limits:
            memory: "256Mi"
            cpu: "100m"
        volumeMounts:
        - mountPath: /conf
          name: zookeeper-claim0
        - mountPath: /data
          name: zookeeper-claim1
        - mountPath: /datalog
          name: zookeeper-claim2
      restartPolicy: Always
      volumes:
      - name: zookeeper-claim0
        persistentVolumeClaim:
          claimName: zookeeper-claim0
      - name: zookeeper-claim1
        persistentVolumeClaim:
          claimName: zookeeper-claim1
      - name: zookeeper-claim2
        persistentVolumeClaim:
          claimName: zookeeper-claim2
status: {}

这里是 Kafka 服务/部署 yml

apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster
  labels:
    component: kafka
spec:
  ports:
  - name: "9092"
    port: 9092
    targetPort: 9092
  selector:
    component: kafka
status:
  loadBalancer: {}

---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      component: kafka    
  template:
    metadata:
      labels:
        component: kafka
    spec:
      containers:
      - args:
        - start-kafka.sh
        env:
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: LISTENER_BOB
        - name: KAFKA_ADVERTISED_LISTENERS
          value: LISTENER_BOB://:9092
        - name: KAFKA_LISTENERS
          value: LISTENER_BOB://:9092
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value:  LISTENER_BOB:PLAINTEXT
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_LOG_DIRS
          value: /kafka/kafka-logs
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-cluster:2181
        image: wurstmeister/kafka:2.12-2.4.1
        name: kafka
        ports:
        - containerPort: 9092
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
        volumeMounts:
        - mountPath: /kafka/kafka-logs
          name: kafka-claim0
      restartPolicy: Always
      volumes:
      - name: kafka-claim0
        persistentVolumeClaim:
          claimName: kafka-claim0
status: {}

当尝试从 kafka-cluster:9092 上的另一个应用程序访问 kafka 时,它也作为部署运行,它会引发 UnresolvedAddressException。其中 kafka-6799c65d58-f6tbt:9092 是 pod 名称

java.io.IOException: Can't resolve address: **kafka-6799c65d58-f6tbt:9092**
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
    at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.nio.channels.UnresolvedAddressException: null
    at java.base/sun.nio.ch.Net.checkAddress(Net.java:130)
    at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
    ... 9 common frames omitted 

我在配置时是否犯了任何错误?或者有什么替代方法吗?

【问题讨论】:

  • 您尝试从哪里访问 Kafka 集群?
  • 我正在尝试从 kakfa-cluster:9092 上的同一 minikube 单节点集群中作为 kubernetes pod 运行的另一个应用程序访问它

标签: kubernetes apache-kafka kubernetes-pod


【解决方案1】:

我能想到几个问题。

  1. 服务类型没有明确设置,默认为ClusterIP类型,只能在集群内访问。 (K8s 内部 DNS 应该可以工作) .如果您想将其暴露给外界,您可以使用NodePortLoadBalancer 类型。

  2. 即使你的应用运行在同一个 k8s 集群和不同的 k8s 命名空间中,你也必须使用kafka-cluster.mynamespace 作为内部地址。

点击此处了解更多信息:https://kubernetes.io/docs/concepts/services-networking/

【讨论】:

  • 我正在尝试从在同一个 minikube 集群中的 k8s 中运行的另一个应用程序访问它,并且还没有指定命名空间,因此假设所有内容都在同一个 k8s 命名空间下。查看异常,似乎服务正确重定向到 pod 地址 kafka-6799c65d58-f6tbt:9092,但仍然得到未解决的地址异常
  • 我不确定它为什么会弹出一个 pod 名称。它不应该指向暴露 Kafka 的服务而不是 pod 本身吗?出了点问题。您是否可以通过curl 访问 Kafka 服务或在集群内 ping 入?
【解决方案2】:

看起来 kafka 代理正在将自己的主机名 (kafka-6799c65d58-f6tbt) 宣传为 FQDN,这与 pod 名称相同。 DNS 无法解析部署 pod 的名称。

如果您查看任何 kafka helm 图表,即this one,您将看到他们正在使用 statefulsets。 Statefulsets 允许解析 pod 的 IP 地址。看看here at k8s docs 这是如何工作的。

您也可以尝试将 KAFKA_ADVERTISED_LISTENERS 设置为:

- name: MY_POD_IP
  valueFrom:
    fieldRef:
      fieldPath: status.podIP
- name: KAFKA_ADVERTISED_LISTENERS
  value: "LISTENER_BOB://$(MY_POD_IP):9092/"

但是在更改副本数量时它不能很好地扩展。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-07-27
    • 2018-06-05
    • 2017-02-10
    • 2017-08-22
    • 1970-01-01
    • 2021-12-15
    • 1970-01-01
    • 2014-11-26
    相关资源
    最近更新 更多