【发布时间】:2019-12-30 10:00:22
【问题描述】:
我正在尝试使用 wifi 接口从一台 PC 上的 Kafka Producer 向另一台 PC 上的 Kafka 代理发送消息,但消息未出现在指定主题中在卡夫卡经纪人。
我使用华硕无线路由器连接了两台电脑,并禁用了电脑和路由器上的所有防火墙。两台 PC 互相 ping 成功。当我转向有线连接时,它可以工作并且消息被摄取到 kafka 代理 pc 上的指定主题。
卡夫卡制作人:
public class CarDataProducer {
public static void main(String[] args) {
CarDataProducer fProducer= new CarDataProducer();
Producer<String, CarData> producer= fProducer.initializeKafkaProducer();
String topicName = "IN-DATA";
CSVReaderCarData csvReader = new CSVReaderCarData();
List<CarData> CarDataList = csvReader.readCarDataFromCSV("data/mllib/TrainTest_101.csv");
//read from CSV file and send
for (CarData val : CarDataList) {
producer.send(new ProducerRecord<String, CarData>(topicName, val));
}
}
public KafkaProducer<String, CarData> initializeKafkaProducer() {
// Set the producer configuration properties.
Properties props = ProducerProperties.getInstance();
// Instantiate a producerSampleJDBC
KafkaProducer<String, CarData> producer = new KafkaProducer<String, CarData>(props);
return producer;
}
public class ProducerProperties {
private ProducerProperties() {
}
public static final Properties props = new Properties();
static {
props.put("bootstrap.servers", "192.168.1.124:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 500);
props.put("linger.ms", 500);
props.put("buffer.memory", 500);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.iov.safety.vehicleproducer.CarDataSerializer");
}
public static Properties getInstance() {
return props;
}
}
使用服务器端的控制台检查通过 Kafka Consumer 接收的消息:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IN-DATA
消息的接收应该是这样的:
kafka-console-consumer.sh --bootstrap-server 192.168.1.124:9092 --topic IN-DATA
{"instSpeed":19.0,"time":15.0,"label":0.0}
{"instSpeed":64.0,"time":15.0,"label":1.0}
{"instSpeed":10.0,"time":16.0,"label":0.0}
服务器端的ifconfig
kafka 生产者端的 ipconfig
- server.properties:
listeners=PLAINTEXT://:9092
- netstat -ano|grep '9092'
tcp6 0 0 :::9092 :::*
监听关闭 (0.00/0/0) tcp6 0 0 127.0.0.1:53880
127.0.1.1:9092 已建立保活 (6659.53/0/0) tcp6 0 0 127.0.1.1:9092 127.0.0.1:53880 已建立 保活 (6659.53/0/0) tcp6 0 0 127.0.1.1:9092
127.0.0.1:53878 已建立保活 (6659.15/0/0) tcp6 0 0 127.0.0.1:53878 127.0.1.1:9092 已建立 保活(6659.15/0/0)
通过添加回调到 kafka 生产者的发送,我得到超时错误:
org.apache.kafka.common.errors.TimeoutException:IN-DATA-0 的 8 条记录到期:自上次追加以来已过去 30045 毫秒
【问题讨论】:
标签: apache-kafka