1.生产者:在发送完消息后,收到回执确认。
主要是在SimpleProducer.java中修改了发送消息的2行代码,用到了回调函数,修改如下:
//发送消息 ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic","hello world from win7"); producer.send(rec,new Callback() { public void onCompletion(RecordMetadata metadata,Exception exception) { System.out.println("ack!!!"); } }); //在发送消息后,收到回执确认。
完整代码如下:
1 package cn.test.mykafka; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.Callback; 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata; 10 11 /** 12 * 简单生产者:在发送完消息后,收到回执确认。 13 * 14 */ 15 16 public class SimpleProducer2 { 17 18 public static void main(String[] args) { 19 20 //创建配置信息 21 Properties props = new Properties(); 22 props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口 23 props.put("acks", "all"); 24 props.put("retries", 0); 25 props.put("batch.size", 16384); 26 props.put("linger.ms", 1); 27 props.put("buffer.memory", 33554432); 28 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 29 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 30 31 //创建一个生产者 32 Producer<String, String> producer = new KafkaProducer<>(props); 33 34 //发送消息 35 ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic","hello world from win7"); 36 producer.send(rec,new Callback() { 37 public void onCompletion(RecordMetadata metadata,Exception exception) { 38 System.out.println("ack!!!"); 39 } 40 }); //在发送消息后,收到回执确认。 41 42 //for (int i = 0; i < 10; i++) 43 // producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key(非必填),value 44 45 System.out.println("over"); 46 producer.close(); 47 } 48 }