【问题标题】:Unable to Connect from Java to Kafka running in Docker无法从 Java 连接到在 Docker 中运行的 Kafka
【发布时间】:2020-09-12 18:57:07
【问题描述】:

尝试使用 Debezium 将 MySql 数据库流式传输到 Kafka。

所以,在 Docker Container 中,我已经启动了 Zookeeper、Kafka、MySQL 数据库、MySQL 命令行和 Kafka Connect。

当我在 MySQL 命令行中运行任何 DML 命令时,我可以在我在 docker 中启动的观察程序窗口中看到更改事件。所以目前一切看起来都很好。请在下面找到相同的内容。

现在我正在尝试使用 Java 代码中的更改事件,只要我在 MySQL 命令行中执行任何 DML 命令,就可以在观察程序窗口中看到这些更改事件。请在下面找到消费者。

            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            Consumer<String, String> consumer = new KafkaConsumer<>(properties);
            ArrayList<String> topics = new ArrayList<>();
            topics.add("dbserver1.inventory.customers");
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1L);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Message received: " + record.value());
                }
                consumer.commitAsync();
            }

无法使用来自上述消费者的数据更改事件。如果有什么需要做的,请告诉我。

【问题讨论】:

  • 您的 Java 消费者在哪里运行?你从中得到了什么错误?
  • Java Consumer 和 Docker Setup 都在我的本地机器上运行。我尝试如下更改 BOOTSTRAP_SERVERS_CONFIG 的属性,现在工作正常。能够消费数据的变化。 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "plaintext://localhost:9092");现在我的问题已经转向 - 如何在连接到 MySQL 并流式传输表时获取 Kafka 主题的初始内容,因为我只能使用数据更改?
  • @Kola,欢迎来到 SO。您可以发布答案,并针对初始内容问题提出单独的问题吗?谢谢。

标签: java docker apache-kafka kafka-consumer-api


【解决方案1】:

有效和无效的区别在于您访问的侦听器端口。

所以要么:

  • 29092 是错误的,9092 是正确的
  • 29092 绑定到 Docker 网络内部的通告侦听器,9092 绑定到 localhost (learn more)

【讨论】:

  • 是的,没错。谢谢罗宾。
【解决方案2】:

我通过如下更改 BOOTSTRAP_SERVERS_CONFIG 的属性解决了这个问题,现在工作正常。能够消费数据的变化。

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

并使用数据更改

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(1000L);
  for (ConsumerRecord<String, String> record : records) {
    String payload = record.value();
    if(payload.contains("payload")) {
    JsonObject root = (JsonObject) new JsonParser().parse(payload);
    root = root.getAsJsonObject("payload").getAsJsonObject("after");
   }
  }
  consumer.commitAsync();
}

它对我有用。 :)

【讨论】:

  • 顺便说一句,如果你使用 Kafka Streams API 会更干净
猜你喜欢
  • 2018-06-07
  • 2020-01-14
  • 2022-11-18
  • 2020-04-22
  • 1970-01-01
  • 2021-02-15
  • 2020-07-31
  • 2022-12-11
  • 1970-01-01
相关资源
最近更新 更多