碰到的问题
(1)线程操作问题,因为单机节点,代码加锁就好了,后续再写
(2) 消费者写hdfs的时候以流的形式写入,但是什么时候关闭流就是一个大问题了,这里引入了 fsDataOutputStream.hsync();
1 hsync 保证 hdfs在写数据的时候被新的reader读到,保证数据被datanode持久化
生产者
1 package com.xuliugen.kafka.demo; 2 3 import org.apache.kafka.clients.producer.KafkaProducer; 4 import org.apache.kafka.clients.producer.ProducerRecord; 5 6 import java.util.Properties; 7 8 public class ProducerDemo { 9 10 // Topic 11 private static final String topic = "tangsonghuai"; 12 13 public static void main(String[] args) throws Exception { 14 15 Properties props = new Properties(); 16 props.put("bootstrap.servers", "192.168.15.140:9092"); 17 props.put("acks", "0"); 18 props.put("group.id", "1111"); 19 props.put("retries", "0"); 20 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 21 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 22 23 //生产者实例 24 KafkaProducer producer = new KafkaProducer(props); 25 26 int i = 1; 27 28 // 发送业务消息 29 // 读取文件 读取内存数据库 读socket端口 30 while (i<50) { 31 Thread.sleep(100); 32 producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i)); 33 System.out.println("key:" + i + " " + "value:" + i); 34 i++; 35 } 36 } 37 }