【问题标题】:Kafka Mongo on kubernetes(minikube) (Mongo as source to Kafka NOT Working) with bitnami/kafkaKafka Mongo on kubernetes(minikube) (Mongo 作为 Kafka NOT Working 的源) 与 bitnami/kafka
【发布时间】:2022-01-21 05:14:14
【问题描述】:

哪个图表: 图表/bitnami/kafka/ https://github.com/bitnami/charts/tree/master/bitnami/kafka

描述错误 我正在关注教程Build a Scalable, Fault-Tolerant Messaging Cluster on Kubernetes with Apache Kafka and MongoDB

为了解决 extraDeploy 问题,我关注了extraDeploy doesn't render with example in documentation #5649。问题已解决,我的工作配置如下:

  1. Dockerfile
FROM bitnami/kafka:latest

RUN mkdir -p /opt/bitnami/kafka/plugins && \
    cd /opt/bitnami/kafka/plugins && \
    curl --remote-name --location --silent https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.6.1/mongo-kafka-connect-1.6.1-all.jar
  1. Values.yml
extraDeploy:
  - |
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    spec:
      replicas: 1
      selector:
        matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }}
          app.kubernetes.io/component: connector
      template:
        metadata:
          labels: {{- include "common.labels.standard" . | nindent 8 }}
            app.kubernetes.io/component: connector
        spec:
          containers:
            - name: connect
              image: kafka-connect-bitnami:5.22
              imagePullPolicy: Never
              command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-sink.properties"]
              ports:
                - name: connector
                  containerPort: 8083
              volumeMounts:
                - name: configuration
                  mountPath: /config
          volumes:
            - name: configuration
              configMap:
                name: {{ include "kafka.fullname" . }}-connect
  - |
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    data:
      connect-standalone.properties: |-
        bootstrap.servers={{ include "kafka.fullname" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}:{{ .Values.service.port }}
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true
        value.converter.schemas.enable=true
        offset.storage.file.filename=/tmp/connect.offsets
        offset.flush.interval.ms=20000
        plugin.path=/opt/bitnami/kafka/plugins
      mongodb-source.properties: |-
        connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
        name=mongo-source-connector
        topics=source-topic
        connector.class=com.mongodb.kafka.connect.MongoSourceConnector
        tasks.max=1
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        value.converter.schemas.enable=false
        database=mydb
        collection=source
        batch.size=0
        change.stream.full.document=updateLookup
        pipeline=[]
        collation=
      mongodb-sink.properties: |-
        connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
        name=mongo-sink-connector
        topics=sink-topic
        connector.class=com.mongodb.kafka.connect.MongoSinkConnector
        tasks.max=1
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        value.converter.schemas.enable=false
        database=mydb
        collection=sink
  - |
    apiVersion: v1
    kind: Service    
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    spec:
      ports:
        - protocol: TCP
          port: 8083
          targetPort: connector
      selector: {{- include "common.labels.matchLabels" . | nindent 4 }}
        app.kubernetes.io/component: connector
  1. 我已经使用以下教程 [使用 Bitnami、Kubernetes 和 Helm 横向扩展您的 MongoDB 部署] (https://engineering.bitnami.com/articles/horizontally-scale-your-mongodb-deployment-with-bitnami-kubernetes-and-helm.html) 部署了 mongodb 和副本

然后我跑

helm install kafka bitnami/kafka -f values.yml

我已经为 mongodb-sink-connector 设置并运行了以下设置:

NAME                                READY   STATUS    RESTARTS      AGE
pod/kafka-0                         1/1     Running   1 (35h ago)   35h
pod/kafka-client                    1/1     Running   0             43h
pod/kafka-connect-669487944-gb4p7   1/1     Running   0             57m
pod/kafka-zookeeper-0               1/1     Running   0             35h
pod/mongodb-arbiter-0               1/1     Running   0             42h
pod/mongodb-client                  1/1     Running   0             41h
pod/mongodb-primary-0               2/2     Running   0             42h
pod/mongodb-secondary-0             2/2     Running   0             42h

NAME                               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
service/kafka                      ClusterIP   10.98.236.199   <none>        9092/TCP                     35h
service/kafka-connect              ClusterIP   10.105.58.215   <none>        8083/TCP                     35h
service/kafka-headless             ClusterIP   None            <none>        9092/TCP,9093/TCP            35h
service/kafka-zookeeper            ClusterIP   10.108.22.188   <none>        2181/TCP,2888/TCP,3888/TCP   35h
service/kafka-zookeeper-headless   ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   35h
service/kubernetes                 ClusterIP   10.96.0.1       <none>        443/TCP                      14d
service/mongodb                    ClusterIP   10.96.105.153   <none>        27017/TCP,9216/TCP           42h
service/mongodb-headless           ClusterIP   None            <none>        27017/TCP                    42h

NAME                            READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-connect   1/1     1            1           35h

NAME                                       DESIRED   CURRENT   READY   AGE
replicaset.apps/kafka-connect-54cff6f879   0         0         0       35h
replicaset.apps/kafka-connect-59fcf7754c   0         0         0       24h
replicaset.apps/kafka-connect-64c5697f54   0         0         0       21h
replicaset.apps/kafka-connect-669487944    1         1         1       21h
replicaset.apps/kafka-connect-66c6dd4679   0         0         0       35h
replicaset.apps/kafka-connect-84ffbffd5c   0         0         0       23h

NAME                                 READY   AGE
statefulset.apps/kafka               1/1     35h
statefulset.apps/kafka-zookeeper     1/1     35h
statefulset.apps/mongodb-arbiter     1/1     42h
statefulset.apps/mongodb-primary     1/1     42h
statefulset.apps/mongodb-secondary   1/1     42h

我可以通过上述配置成功地将消息发布到 mongodb。以下是在 kafaka-connect pod 上成功接收后的一些消息。

[2022-01-19 11:52:25,476] INFO [mongo-sink-connector|task-0] [Consumer clientId=connector-consumer-mongo-sink-connector-0, groupId=connect-mongo-sink-connector] Setting offset for partition sink-topic-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:818)
[2022-01-19 11:52:25,519] INFO [mongo-sink-connector|task-0] Cluster created with settings {hosts=[mongodb.default.svc.cluster.local:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:7744}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:7743}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,523] INFO [mongo-sink-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=mongodb.default.svc.cluster.local:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2650238, setName='rs0', canonicalAddress=mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, hosts=[mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, mongodb-secondary-0.mongodb-headless.default.svc.cluster.local:27017], passives=[], arbiters=[mongodb-arbiter-0.mongodb-headless.default.svc.cluster.local:27017], primary='mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=3, topologyVersion=null, lastWriteDate=Wed Jan 19 11:52:24 UTC 2022, lastUpdateTimeNanos=188123424943131} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,566] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:6, serverValue:7745}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)

复制

来自制片人:

kubectl exec --tty -i kafka-client --namespace default -- kafka-console-producer.sh --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092 --topic sink-topic  

>{"foo":"bar.....12"}  
>{"foo":"bar.....1122"}
>

在 MongoDB 中:

rs0:PRIMARY> use mydb;
rs0:PRIMARY> db.sink.find()
{ "_id" : ObjectId("61e7fb793bb99a00505efa14"), "foo" : "bar.....12" }
{ "_id" : ObjectId("61e7fb9f3bb99a00505efa16"), "foo" : "bar.....1122" }
rs0:PRIMARY> 

问题? 问题是该图表仅适用于 mongo-sink。我无法使用 mongo-source-connector 来使用 mongodb 作为上述配置values.yml 的源。 注意,对于使用 mongo-source-connector,我会在 values.yml 文件中进行以下更改。

command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-source.properties"]

预期行为

(i)- Kafka Connect 与 Mongo 作为 SINK 合作,意思是 kafka-->mongo (OK)。

(ii) 但是,它不能作为 SOURCE 工作,意思是 mongo-->kafka(不行)。

pod 中没有错误或任何告诉 kafka 无法与 mongo 通信的错误消息。

我的问题:

  1. 如果我缺少任何必要的设置,要实现哪些设置 (ii)。
  2. 是否可以同时运行 source-connector 和 sink-connector 以接收 kafka 中的更新并从 kafka 发送数据。如果是,为实现此目的需要进行哪些必要的更新。

提前感谢您的反馈

【问题讨论】:

  • 为什么要更改命令? Kafka Connect 应该在分布式模式下运行,并且您向其 API 发出 HTTP POST 以获取连接器配置。这样,您发布两个配置以并行运行。其次,我建议使用 Strimzi,它提供了kind: KafkaConnect CRD 对象,而不是需要弄乱 ConfigMap
  • 我明白了...本教程使用connect-standalone...尝试使用包含confluent-hub such as mine 的其他图像来安装the mongo connector

标签: mongodb kubernetes apache-kafka apache-kafka-connect bitnami


【解决方案1】:

问题是这个图表只适用于 mongo-sink

因为这是您提供的唯一连接器文件。

你可以给多个

command: 
  - /bin/bash
  - -c
  - /opt/bitnami/kafka/bin/connect-standalone.sh 
  - /config/connect-standalone.properties 
  - /config/mongodb-source.properties
  - /config/mongodb-sink.properties

注意:这不是容错的,也不能用于生产。相反,您的命令应该真正使用connect-distrubuted.sh,并且您的连接器配置可以保存为 JSON 文件并 POST 到服务入口端口。

【讨论】:

  • 感谢您的回复。我尝试使用连接器、源和接收器运行,但仍然只有接收器在工作,如前所述。我从 mongo 输入数据,但它没有出现在消费者中。当我启动消费者时,我只看到以下消息。现在我观察到一个变化,当我开始使用消费者时,我看到了以下消息:WARN [Consumer clientId=consumer-console-consumer-87690-1, groupId=console-consumer-87690] Error while fetching metadata with correlation id 2 : {mongo-source-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  • 感谢您的回复。 kubectl exec -it kafka-connect-765c5c57c5-djkch -- curl http://localhost:8083/connectors ["mongo-sink-connector","mongo-source-connector"]% kafka-connect 上的状态:k exec -it kafka-connect-765c5c57c5-djkch -- curl http://localhost:8083/connectors/mongo-source-connector/status {"name":"mongo-source-connector","connector":{"state":"RUNNING","worker_id":"172.17.0.12:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.12:8083"}],"type":"source"}%
  • 我观察到主题mongo-source-topic 没有创建。我使用kubectl exec --tty -i kafka-client --namespace default -- kafka-topics.sh --zookeeper kafka-zookeeper.default.svc.cluster.local:2181 --list 进行了检查。然后我创建了主题并在列表中验证。然后开始消费。 kubectl exec --tty -i kafka-client --namespace default -- kafka-console-consumer.sh --bootstrap-server kafka.default.svc.cluster.local:9092 --topic mongo-source-topic --from-beginning 。现在,我没有收到任何警告。此外,当我在 mongo 中插入数据时,消费者中没有任何显示
  • 源连接器状态为 RUNNING,除此之外,我唯一的建议是查看日志。我无法回答为什么在不复制您的环境的情况下无法发送数据。
  • 分布式模式不会神奇地解决这个问题,我只建议将其作为大规模运行 Kafka Connect 的“正确”方式。我建议使用Strimzi KafkaConnect CRD。或者使用不同的容器 like mine 并配置它,相应地下载 mongo 连接器。
猜你喜欢
  • 2020-05-08
  • 1970-01-01
  • 2021-10-07
  • 2020-05-09
  • 1970-01-01
  • 1970-01-01
  • 2021-08-18
  • 2021-10-02
  • 1970-01-01
相关资源
最近更新 更多