【发布时间】:2020-05-02 09:25:27
【问题描述】:
我当前的测试配置如下:
version: '3.7'
services:
postgres:
image: debezium/postgres
restart: always
ports:
- "5432:5432"
zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka
restart: always
ports:
- "9092:9092"
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=250
connect:
image: debezium/connect
restart: always
ports:
- "8083:8083"
links:
- zookeeper
- postgres
- kafka
depends_on:
- zookeeper
- postgres
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
我像这样使用 docker-compose 运行它:
$ docker-compose up
而且我没有看到任何错误消息。似乎一切运行正常。如果我执行docker ps,我会看到所有服务都在运行。
为了检查 Kafka 是否正在运行,我在 Python 中创建了 Kafka 生产者和 Kafka 消费者:
# producer. I run it in one console window
from kafka import KafkaProducer
from json import dumps
from time import sleep
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('numtest', value=data)
sleep(5)
# consumer. I run it in other colsole window
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message)
而且效果非常好。我看到我的生产者如何发布消息,我看到它们在消费者窗口中是如何被消费的。
现在我想让 CDC 工作。首先,在 Postgres 容器中,我将postgres 角色密码设置为postgres:
$ su postgres
$ psql
psql> \password postgres
Enter new password: postgres
然后我创建了一个新数据库test:
psql> CREATE DATABASE test;
我创建了一个表:
psql> \c test;
test=# create table mytable (id serial, name varchar(128), primary key(id));
最后,我为我的 Debezium CDC 堆栈创建了一个连接器:
$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "postgres",
"database.whitelist": "public.mytable",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "public.some_topic"
}
}'
{"name":"test-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"test","database.server.name":"postgres","database.whitelist":"public.mytable","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"public.some_topic","name":"test-connector"},"tasks":[],"type":"source"}
如您所见,我的连接器创建时没有任何错误。现在我希望 Debezium CDC 发布对 Kafka 主题 public.some_topic 的所有更改。为了检查这一点,我创建了一个新的 Kafka 消费者:
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'public.some_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message)
与第一个示例的唯一区别是我正在观看public.some_topic。然后我转到数据库控制台并进行插入:
test=# insert into mytable (name) values ('Tom Cat');
INSERT 0 1
test=#
因此,插入了一个新值,但我看到消费者窗口中没有发生任何事情。换句话说,Debezium 不会将事件发布到 Kafka public.some_topic。这有什么问题,我该如何解决?
【问题讨论】:
-
1.如果查询连接器的状态,它是否还在运行? 2. Kafka Connect worker 日志中是否有任何内容显示连接器失败? 3. 我会使用
kafkacat来检查主题和生产/消费数据:) -
@Robin Moffatt。如果我运行
docker ps,我看到我的connect服务正在运行。 -
@Robin Moffatt。我刚刚检查了连接器日志,发现有一行重复:
INFO || WorkerSourceTask{id=test-connector2-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] -
你解决了吗,我试图运行你的 docker-compose 但我看到一些错误 onnect_1 | 2020-04-16 06:06:36,922 错误 || WorkerSourceTask{id=test-connector-0} 任务抛出未捕获且不可恢复的异常 [org.apache.kafka.connect.runtime.WorkerTask] connect_1 | io.debezium.jdbc.JdbcConnectionException: 错误: 语法错误 connect_1 |在 io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:145)
标签: postgresql apache-kafka apache-kafka-connect debezium