【问题标题】:How to send messages from Kafka producer on one PC to kafka broker on another PC?如何将消息从一台 PC 上的 Kafka 生产者发送到另一台 PC 上的 kafka 代理?
【发布时间】:2019-12-30 10:00:22
【问题描述】:

我正在尝试使用 wifi 接口从一台 PC 上的 Kafka Producer 向另一台 PC 上的 Kafka 代理发送消息,但消息未出现在指定主题中在卡夫卡经纪人。

我使用华硕无线路由器连接了两台电脑,并禁用了电脑和路由器上的所有防火墙。两台 PC 互相 ping 成功。当我转向有线连接时,它可以工作并且消息被摄取到 kafka 代理 pc 上的指定主题。

卡夫卡制作人:

public class CarDataProducer {
    public static void main(String[] args) {

        CarDataProducer fProducer= new CarDataProducer();
        Producer<String, CarData> producer= fProducer.initializeKafkaProducer();

        String topicName = "IN-DATA";

        CSVReaderCarData csvReader = new CSVReaderCarData();
        List<CarData> CarDataList = csvReader.readCarDataFromCSV("data/mllib/TrainTest_101.csv");

        //read from CSV file and send
        for (CarData val : CarDataList) {
            producer.send(new ProducerRecord<String, CarData>(topicName, val));
        }
    }

    public KafkaProducer<String, CarData> initializeKafkaProducer() {

        // Set the producer configuration properties.
        Properties props = ProducerProperties.getInstance();

        // Instantiate a producerSampleJDBC
        KafkaProducer<String, CarData> producer = new KafkaProducer<String, CarData>(props);

        return producer;
    }

public class ProducerProperties {
    private ProducerProperties() {
    }

    public static final Properties props = new Properties();
    static {
        props.put("bootstrap.servers", "192.168.1.124:9092");
        props.put("acks", "0");
        props.put("retries", 0);
        props.put("batch.size", 500);
        props.put("linger.ms", 500);
        props.put("buffer.memory", 500);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "com.iov.safety.vehicleproducer.CarDataSerializer");
    }

    public static Properties getInstance() {
        return props;
    }
}

使用服务器端的控制台检查通过 Kafka Consumer 接收的消息:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IN-DATA

消息的接收应该是这样的:

 kafka-console-consumer.sh --bootstrap-server 192.168.1.124:9092 --topic IN-DATA
{"instSpeed":19.0,"time":15.0,"label":0.0}
{"instSpeed":64.0,"time":15.0,"label":1.0}
{"instSpeed":10.0,"time":16.0,"label":0.0}

服务器端的ifconfig


kafka 生产者端的 ipconfig


  • server.properties:

listeners=PLAINTEXT://:9092

  • netstat -ano|grep '9092'

tcp6 0 0 :::9092 :::*
监听关闭 (0.00/0/0) tcp6 0 0 127.0.0.1:53880
127.0.1.1:9092 已建立保活 (6659.53/0/0) tcp6 0 0 127.0.1.1:9092 127.0.0.1:53880 已建立 保活 (6659.53/0/0) tcp6 0 0 127.0.1.1:9092
127.0.0.1:53878 已建立保活 (6659.15/0/0) tcp6 0 0 127.0.0.1:53878 127.0.1.1:9092 已建立 保活(6659.15/0/0)


通过添加回调到 kafka 生产者的发送,我得到超时错误:

org.apache.kafka.common.errors.TimeoutException:IN-DATA-0 的 8 条记录到期:自上次追加以来已过去 30045 毫秒

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    我解决了。每个 Kafka 代理都必须公布其主机名/IP,才能从另一台电脑上的 Kafka Producer 访问。

     kafka-server-start.sh config/server.properties --override  advertised.listeners=PLAINTEXT://192.168.1.124:9092
    sha
    

    相反,我们可以按如下方式更新 config/server.properties:

    advertised.listeners=PLAINTEXT://your.host.name:9092 
    

    advertised.listeners=PLAINTEXT://192.168.1.124:9092
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-10-18
      • 1970-01-01
      • 1970-01-01
      • 2012-02-02
      • 2021-01-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多