【问题标题】:Bug in my Kafka Producer class我的 Kafka Producer 课程中的错误
【发布时间】:2017-06-18 17:07:07
【问题描述】:

我对 Kafka 非常陌生,并开始研究生产者-消费者模型。
我为 Kafka 生产者编写了一个程序,该程序将从 MySQL 读取一个表,并且我正在命令提示符下读取这些消息。
问题是我只能看到表格最后一列的数据。

MySQL表结构及数据:

+----------+---------+----------------+
| dept     | empName | salary(double) |
+----------+---------+----------------+
| Engg     | Fred    |   2000         |
| Engg     | Bob     |   3000         |
| Engg     | Joe     |   1000         |
| Arts     | Jack    |   5000         |
| Commerce | Jill    |   2400         |
| Arts     | James   |   3000         |
| Commerce | Rob     |   8700         |
+----------+---------+----------------+

Kafka Producer 类:

package com.kafka.producer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

@SuppressWarnings("deprecation")
public class KafkaMySqlProducer {
    public static void main(String[] args) {

        Connection con;
        PreparedStatement pstmnt;
        ResultSet rs;

        Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("metadata.broker.list","localhost:9092");
        ProducerConfig config = new ProducerConfig(props);
        Producer producer = new Producer(config);

        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb","root","cloudera");
            pstmnt = con.prepareStatement("select * from department");
            rs = pstmnt.executeQuery();
            while(rs.next()) {
                String name = rs.getString(1);
                String department = rs.getString(2);
                String salary = " " + rs.getDouble(3);
                producer.send(new KeyedMessage("dbrec", name, department, salary));
            }
            con.close();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

我的消费者的输出:

[cloudera@quickstart kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbrec --from-beginning
[2017-06-18 10:23:22,428] WARN Error while fetching metadata with correlation id 2 : {dbrec=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-06-18 10:23:22,628] WARN The following subscribed topics are not assigned to any members in the group console-consumer-33197 : [dbrec]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

 2000.0
 3000.0
 1000.0
 5000.0
 2400.0
 3000.0
 8700.0

我只是从表格的最后一列获取数据。
谁能告诉我我在这里犯了什么错误?

【问题讨论】:

  • 如果您正在使用 Kafka 开始一个新项目,最好开始使用新的生产者和消费者类。我看到您正在使用较旧的生产者连接到 Zookeeper 来获取有关代理的信息,而不是直接到其中之一。

标签: java apache-kafka kafka-producer-api


【解决方案1】:

KeyedMessage 带有 4 个参数的构造函数的最后一个参数是实际消息。

该类不只是接受所有值的列表。

具体来说,你提供的参数是(topic, key, partKey, message)。你只从topic消费message

【讨论】:

  • 好的.. 像这样更正它。 producer.send(new KeyedMessage("dbrec", 姓名 + " " + 部门 + " " + 薪水));现在正在工作。
  • 我还看到这两行的弃用消息。 ProducerConfig config = new ProducerConfig(props);生产者生产者=新生产者(配置);你能告诉我这些方法的新选项是什么,以便我可以实现它们。
  • 不确定,但 JavaDoc 可能会为您回答这个问题
  • 好的。将检查 JavaDoc
猜你喜欢
  • 1970-01-01
  • 2021-11-25
  • 1970-01-01
  • 1970-01-01
  • 2019-02-15
  • 2021-05-10
  • 2021-01-04
  • 1970-01-01
  • 2021-03-11
相关资源
最近更新 更多