已经搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。现在来搭建kafka的开发环境,这里用的开发语言是Java,构建工具Maven
一、添加依赖
搭建开发环境需要引入kafka的jar包,一种方式是将kafka安装包中lib下的jar包加入到项目的classpath中,不过我们使用的是另一种方式:使用maven管理jar包依赖。
创建好maven项目后,在pom.xml中添加以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
如果添加依赖后发现有jar包的依赖找不到,可以上网下载需要的jar包后解压直接拷贝到maven本地仓库的相应文件夹下
二、配置程序
kafka配置工具类
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
public class KafkaUtilTest {
private static String SERVERS = "localhost:9092";
private static String GROUP = "test-consumer-group";
private static String TOPICS = "test";
public static String getTopicStr(){
return TOPICS;
}
public static List<String> getTopics(){
List<String> topics = new ArrayList<String>();
topics.add(TOPICS);
return topics;
}
public static KafkaProducer<String, String> getProduceInstance(){
Properties props = new Properties();
props.put("bootstrap.servers", SERVERS);//kafka的地址
props.put("acks", "1"); //消息的确认机制,默认值是0
// acks=0:如果设置为0,生产者不会等待kafka的响应。
// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
props.put("retries", 0); //配置为大于0的值的话,客户端会在消息发送失败时重新发送
props.put("batch.size", 1); //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(props);
}
public static KafkaConsumer<String, String> getConsumerInstance(){
Properties props = new Properties();
props.put("bootstrap.servers", SERVERS);//kafka的地址
props.put("group.id", GROUP);//组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("enable.auto.commit", "false");//是否自动提交,默认为true
return new KafkaConsumer<>(props);
}
}
Producer端代码
import java.util.Date;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerTest {
public static void main(String[] args) {
int messageNo = 1;
KafkaProducer<String, String> producer = KafkaUtilTest.getProduceInstance();
for(;;) {
String message = "你好,这是第"+messageNo+"条数据";
producer.send(new ProducerRecord<String, String>(KafkaUtilTest.getTopicStr(), String.valueOf(new Date().getTime()), message));
// 生产数据代码如下:producer.send(new ProducerRecord<String, String>(topic,key,value));
// topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
// key:键值,也就是value对应的值,和Map类似。
// value:要发送的数据,数据格式为String类型的。
System.out.println("发送的信息:" + message);
//生产100条就退出
if(messageNo%100==0){
System.out.println("成功发送了"+messageNo+"条");
break;
}
messageNo++;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer端代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerTest {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = KafkaUtilTest.getConsumerInstance();
//我们需要先订阅一个topic,也就是指定消费哪一个topic
consumer.subscribe(KafkaUtilTest.getTopics());
while(true){
// 批量提交数量
ConsumerRecords<String, String> records = consumer.poll(1000);//订阅之后,我们再从kafka中拉取数据
for (ConsumerRecord<String, String> record : records){//一般来说进行消费会使用监听,这里我们就用for(;;)来进行监听
System.out.println("接收的消息:"+ record.value() + " offset="+record.offset());
}
//手动提交offset记录
consumer.commitSync();
}
}
}
之后运行代码!当然要先保证kafka服务器已启动!
运行Producer端代码生产数据、发送数据
运行Consumer端代码接收Producer端发送的消息
到这里,简单的java调用kafka操作就完成了。
总的来说,简单的开发一个kafka的程序需要以下步骤:
- 成功搭建kafka服务器,并成功启动!
- 得到kafka服务信息,然后在代码中进行相应的配置。
- 配置完成之后,监听kafka中的消息队列是否有消息产生。
- 将产生的数据进行业务逻辑处理!
- Producer :消息生产者,向broker发消息的客户端。
- Consumer :消息消费者,向broker取消息的客户端
- Topic :一个队列,主题。
- Message:消息是kafka处理的对象,在kafka中,消息是被发布到broker的topic中。而consumer也是从相应的topic中拿数据。也就是说,message是按topic存储的
- Consumer Group :将topic消息的广播发给consumer的手段。一个topic可以有多个CG。
- Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
- Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
在此过程中遇到的疑问???
刚开始运行producer端代码发送100条消息,运行consumer端代码同样可以接收到100条消息。但后续测试过程中有遇到producer端发送了100条消息,consumer端却没完整接收到100条,而只是接收到了部分20多条、60条等,具体原因多方咨询查找仍尚未明确,后续若明确会更新文章。
参考文章:https://blog.csdn.net/qazwsxpcm/article/details/79186668