【发布时间】:2020-09-12 18:57:07
【问题描述】:
尝试使用 Debezium 将 MySql 数据库流式传输到 Kafka。
所以,在 Docker Container 中,我已经启动了 Zookeeper、Kafka、MySQL 数据库、MySQL 命令行和 Kafka Connect。
当我在 MySQL 命令行中运行任何 DML 命令时,我可以在我在 docker 中启动的观察程序窗口中看到更改事件。所以目前一切看起来都很好。请在下面找到相同的内容。
现在我正在尝试使用 Java 代码中的更改事件,只要我在 MySQL 命令行中执行任何 DML 命令,就可以在观察程序窗口中看到这些更改事件。请在下面找到消费者。
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("dbserver1.inventory.customers");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1L);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
}
consumer.commitAsync();
}
无法使用来自上述消费者的数据更改事件。如果有什么需要做的,请告诉我。
【问题讨论】:
-
您的 Java 消费者在哪里运行?你从中得到了什么错误?
-
Java Consumer 和 Docker Setup 都在我的本地机器上运行。我尝试如下更改 BOOTSTRAP_SERVERS_CONFIG 的属性,现在工作正常。能够消费数据的变化。 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "plaintext://localhost:9092");现在我的问题已经转向 - 如何在连接到 MySQL 并流式传输表时获取 Kafka 主题的初始内容,因为我只能使用数据更改?
-
@Kola,欢迎来到 SO。您可以发布答案,并针对初始内容问题提出单独的问题吗?谢谢。
标签: java docker apache-kafka kafka-consumer-api