1.生产者:在发送完消息后,收到回执确认。

主要是在SimpleProducer.java中修改了发送消息的2行代码,用到了回调函数,修改如下:

//发送消息
ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic","hello world from win7");
producer.send(rec,new Callback() {
    public void onCompletion(RecordMetadata metadata,Exception exception) {
        System.out.println("ack!!!");
    }
}); //在发送消息后,收到回执确认。

完整代码如下:

 1 package cn.test.mykafka;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.kafka.clients.producer.Callback;
 6 import org.apache.kafka.clients.producer.KafkaProducer;
 7 import org.apache.kafka.clients.producer.Producer;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 import org.apache.kafka.clients.producer.RecordMetadata;
10 
11 /**
12  * 简单生产者:在发送完消息后,收到回执确认。
13  *
14  */
15 
16 public class SimpleProducer2 {
17 
18     public static void main(String[] args) {
19         
20          //创建配置信息
21          Properties props = new Properties();
22          props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口
23          props.put("acks", "all");
24          props.put("retries", 0);
25          props.put("batch.size", 16384);
26          props.put("linger.ms", 1);
27          props.put("buffer.memory", 33554432);
28          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
29          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
30 
31          //创建一个生产者
32          Producer<String, String> producer = new KafkaProducer<>(props);
33          
34         //发送消息
35          ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic","hello world from win7");
36          producer.send(rec,new Callback() {
37              public void onCompletion(RecordMetadata metadata,Exception exception) {
38                  System.out.println("ack!!!");
39              }
40          }); //在发送消息后,收到回执确认。
41          
42          //for (int i = 0; i < 10; i++)
43          //   producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key(非必填),value 
44             
45          System.out.println("over");
46          producer.close();
47     }
48 }
SimpleProducer2.java

相关文章:

  • 2021-11-24
  • 2022-12-23
  • 2022-12-23
  • 2021-07-18
  • 2021-12-29
  • 2022-12-23
  • 2021-09-03
  • 2021-05-02
猜你喜欢
  • 2021-12-22
  • 2021-06-27
  • 2022-01-04
  • 2021-09-11
  • 2022-03-10
  • 2022-12-23
  • 2021-12-04
相关资源
相似解决方案