碰到的问题

  (1)线程操作问题,因为单机节点,代码加锁就好了,后续再写

  (2) 消费者写hdfs的时候以流的形式写入,但是什么时候关闭流就是一个大问题了,这里引入了   fsDataOutputStream.hsync();

 

1 hsync  保证 hdfs在写数据的时候被新的reader读到,保证数据被datanode持久化

 

 

 

 

 

生产者

 1 package com.xuliugen.kafka.demo;
 2 
 3 import org.apache.kafka.clients.producer.KafkaProducer;
 4 import org.apache.kafka.clients.producer.ProducerRecord;
 5 
 6 import java.util.Properties;
 7 
 8 public class ProducerDemo {
 9 
10     // Topic
11     private static final String topic = "tangsonghuai";
12 
13     public static void main(String[] args) throws Exception {
14 
15         Properties props = new Properties();
16         props.put("bootstrap.servers", "192.168.15.140:9092");
17         props.put("acks", "0");
18         props.put("group.id", "1111");
19         props.put("retries", "0");
20         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
21         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
22 
23         //生产者实例
24         KafkaProducer producer = new KafkaProducer(props);
25 
26         int i = 1;
27 
28         // 发送业务消息
29         // 读取文件 读取内存数据库 读socket端口
30         while (i<50) {
31             Thread.sleep(100);
32             producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
33             System.out.println("key:" + i + " " + "value:" + i);
34             i++;
35         }
36     }
37 }
View Code

相关文章:

  • 2021-07-01
  • 2021-05-25
  • 2021-09-11
  • 2021-07-03
  • 2022-12-23
  • 2021-06-27
  • 2022-12-23
猜你喜欢
  • 2021-07-23
  • 2021-12-14
  • 2021-08-24
  • 2021-11-19
  • 2021-06-30
  • 2021-11-20
  • 2021-08-13
相关资源
相似解决方案