【发布时间】:2022-01-21 05:14:14
【问题描述】:
哪个图表: 图表/bitnami/kafka/ https://github.com/bitnami/charts/tree/master/bitnami/kafka
描述错误 我正在关注教程Build a Scalable, Fault-Tolerant Messaging Cluster on Kubernetes with Apache Kafka and MongoDB
为了解决 extraDeploy 问题,我关注了extraDeploy doesn't render with example in documentation #5649。问题已解决,我的工作配置如下:
- Dockerfile
FROM bitnami/kafka:latest
RUN mkdir -p /opt/bitnami/kafka/plugins && \
cd /opt/bitnami/kafka/plugins && \
curl --remote-name --location --silent https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.6.1/mongo-kafka-connect-1.6.1-all.jar
- Values.yml
extraDeploy:
- |
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
spec:
replicas: 1
selector:
matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }}
app.kubernetes.io/component: connector
template:
metadata:
labels: {{- include "common.labels.standard" . | nindent 8 }}
app.kubernetes.io/component: connector
spec:
containers:
- name: connect
image: kafka-connect-bitnami:5.22
imagePullPolicy: Never
command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-sink.properties"]
ports:
- name: connector
containerPort: 8083
volumeMounts:
- name: configuration
mountPath: /config
volumes:
- name: configuration
configMap:
name: {{ include "kafka.fullname" . }}-connect
- |
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
data:
connect-standalone.properties: |-
bootstrap.servers={{ include "kafka.fullname" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}:{{ .Values.service.port }}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=20000
plugin.path=/opt/bitnami/kafka/plugins
mongodb-source.properties: |-
connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
name=mongo-source-connector
topics=source-topic
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
database=mydb
collection=source
batch.size=0
change.stream.full.document=updateLookup
pipeline=[]
collation=
mongodb-sink.properties: |-
connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
name=mongo-sink-connector
topics=sink-topic
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
database=mydb
collection=sink
- |
apiVersion: v1
kind: Service
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
spec:
ports:
- protocol: TCP
port: 8083
targetPort: connector
selector: {{- include "common.labels.matchLabels" . | nindent 4 }}
app.kubernetes.io/component: connector
- 我已经使用以下教程 [使用 Bitnami、Kubernetes 和 Helm 横向扩展您的 MongoDB 部署] (https://engineering.bitnami.com/articles/horizontally-scale-your-mongodb-deployment-with-bitnami-kubernetes-and-helm.html) 部署了 mongodb 和副本
然后我跑
helm install kafka bitnami/kafka -f values.yml
我已经为 mongodb-sink-connector 设置并运行了以下设置:
NAME READY STATUS RESTARTS AGE
pod/kafka-0 1/1 Running 1 (35h ago) 35h
pod/kafka-client 1/1 Running 0 43h
pod/kafka-connect-669487944-gb4p7 1/1 Running 0 57m
pod/kafka-zookeeper-0 1/1 Running 0 35h
pod/mongodb-arbiter-0 1/1 Running 0 42h
pod/mongodb-client 1/1 Running 0 41h
pod/mongodb-primary-0 2/2 Running 0 42h
pod/mongodb-secondary-0 2/2 Running 0 42h
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka ClusterIP 10.98.236.199 <none> 9092/TCP 35h
service/kafka-connect ClusterIP 10.105.58.215 <none> 8083/TCP 35h
service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 35h
service/kafka-zookeeper ClusterIP 10.108.22.188 <none> 2181/TCP,2888/TCP,3888/TCP 35h
service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35h
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 14d
service/mongodb ClusterIP 10.96.105.153 <none> 27017/TCP,9216/TCP 42h
service/mongodb-headless ClusterIP None <none> 27017/TCP 42h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/kafka-connect 1/1 1 1 35h
NAME DESIRED CURRENT READY AGE
replicaset.apps/kafka-connect-54cff6f879 0 0 0 35h
replicaset.apps/kafka-connect-59fcf7754c 0 0 0 24h
replicaset.apps/kafka-connect-64c5697f54 0 0 0 21h
replicaset.apps/kafka-connect-669487944 1 1 1 21h
replicaset.apps/kafka-connect-66c6dd4679 0 0 0 35h
replicaset.apps/kafka-connect-84ffbffd5c 0 0 0 23h
NAME READY AGE
statefulset.apps/kafka 1/1 35h
statefulset.apps/kafka-zookeeper 1/1 35h
statefulset.apps/mongodb-arbiter 1/1 42h
statefulset.apps/mongodb-primary 1/1 42h
statefulset.apps/mongodb-secondary 1/1 42h
我可以通过上述配置成功地将消息发布到 mongodb。以下是在 kafaka-connect pod 上成功接收后的一些消息。
[2022-01-19 11:52:25,476] INFO [mongo-sink-connector|task-0] [Consumer clientId=connector-consumer-mongo-sink-connector-0, groupId=connect-mongo-sink-connector] Setting offset for partition sink-topic-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:818)
[2022-01-19 11:52:25,519] INFO [mongo-sink-connector|task-0] Cluster created with settings {hosts=[mongodb.default.svc.cluster.local:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:7744}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:7743}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,523] INFO [mongo-sink-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=mongodb.default.svc.cluster.local:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2650238, setName='rs0', canonicalAddress=mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, hosts=[mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, mongodb-secondary-0.mongodb-headless.default.svc.cluster.local:27017], passives=[], arbiters=[mongodb-arbiter-0.mongodb-headless.default.svc.cluster.local:27017], primary='mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=3, topologyVersion=null, lastWriteDate=Wed Jan 19 11:52:24 UTC 2022, lastUpdateTimeNanos=188123424943131} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,566] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:6, serverValue:7745}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
复制
来自制片人:
kubectl exec --tty -i kafka-client --namespace default -- kafka-console-producer.sh --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092 --topic sink-topic
>{"foo":"bar.....12"}
>{"foo":"bar.....1122"}
>
在 MongoDB 中:
rs0:PRIMARY> use mydb;
rs0:PRIMARY> db.sink.find()
{ "_id" : ObjectId("61e7fb793bb99a00505efa14"), "foo" : "bar.....12" }
{ "_id" : ObjectId("61e7fb9f3bb99a00505efa16"), "foo" : "bar.....1122" }
rs0:PRIMARY>
问题?
问题是该图表仅适用于 mongo-sink。我无法使用 mongo-source-connector 来使用 mongodb 作为上述配置values.yml 的源。 注意,对于使用 mongo-source-connector,我会在 values.yml 文件中进行以下更改。
command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-source.properties"]
预期行为
(i)- Kafka Connect 与 Mongo 作为 SINK 合作,意思是 kafka-->mongo (OK)。
(ii) 但是,它不能作为 SOURCE 工作,意思是 mongo-->kafka(不行)。
pod 中没有错误或任何告诉 kafka 无法与 mongo 通信的错误消息。
我的问题:
- 如果我缺少任何必要的设置,要实现哪些设置 (ii)。
- 是否可以同时运行 source-connector 和 sink-connector 以接收 kafka 中的更新并从 kafka 发送数据。如果是,为实现此目的需要进行哪些必要的更新。
提前感谢您的反馈
【问题讨论】:
-
为什么要更改命令? Kafka Connect 应该在分布式模式下运行,并且您向其 API 发出 HTTP POST 以获取连接器配置。这样,您发布两个配置以并行运行。其次,我建议使用 Strimzi,它提供了
kind: KafkaConnectCRD 对象,而不是需要弄乱 ConfigMap -
我明白了...本教程使用
connect-standalone...尝试使用包含confluent-hubsuch as mine 的其他图像来安装the mongo connector
标签: mongodb kubernetes apache-kafka apache-kafka-connect bitnami