已经搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。现在来搭建kafka的开发环境,这里用的开发语言是Java,构建工具Maven

一、添加依赖

搭建开发环境需要引入kafka的jar包,一种方式是将kafka安装包中lib下的jar包加入到项目的classpath中,不过我们使用的是另一种方式:使用maven管理jar包依赖。

创建好maven项目后,在pom.xml中添加以下依赖:

        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>0.9.0.0</version>
        </dependency>

搭建kafka开发环境
如果添加依赖后发现有jar包的依赖找不到,可以上网下载需要的jar包后解压直接拷贝到maven本地仓库的相应文件夹下

二、配置程序

kafka配置工具类

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaUtilTest {
	
	private static String SERVERS = "localhost:9092";
	private static String GROUP = "test-consumer-group";
	private static String TOPICS = "test";
	
	public static String getTopicStr(){
		return TOPICS;
	}
	
	public static List<String> getTopics(){
		List<String> topics = new ArrayList<String>();
		topics.add(TOPICS);
		return topics;
	}
    
    public static KafkaProducer<String, String> getProduceInstance(){
    	Properties props = new Properties();
        props.put("bootstrap.servers", SERVERS);//kafka的地址
        props.put("acks", "1");  //消息的确认机制,默认值是0
//        		acks=0:如果设置为0,生产者不会等待kafka的响应。 
//        		acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 
//        		acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
        props.put("retries", 0);  //配置为大于0的值的话,客户端会在消息发送失败时重新发送
        props.put("batch.size", 1); //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<>(props);
    }
    
    public static KafkaConsumer<String, String> getConsumerInstance(){
    	Properties props = new Properties();
    	props.put("bootstrap.servers", SERVERS);//kafka的地址
    	props.put("group.id", GROUP);//组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了
    	props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    	props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
    	props.put("enable.auto.commit", "false");//是否自动提交,默认为true
    	return new KafkaConsumer<>(props);
    }

}

Producer端代码


import java.util.Date;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerTest {

	public static void main(String[] args) {
		int messageNo = 1;
		KafkaProducer<String, String> producer = KafkaUtilTest.getProduceInstance();
		for(;;) {
		    String message = "你好,这是第"+messageNo+"条数据";
		    producer.send(new ProducerRecord<String, String>(KafkaUtilTest.getTopicStr(),  String.valueOf(new Date().getTime()), message));
//			生产数据代码如下:producer.send(new ProducerRecord<String, String>(topic,key,value));
//			topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
//			key:键值,也就是value对应的值,和Map类似。
//			value:要发送的数据,数据格式为String类型的。
			
		    System.out.println("发送的信息:" + message);
	            //生产100条就退出
	            if(messageNo%100==0){
	                System.out.println("成功发送了"+messageNo+"条");
	                break;
	            }
	            messageNo++;
		}
                try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

Consumer端代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		KafkaConsumer<String, String> consumer = KafkaUtilTest.getConsumerInstance();
		//我们需要先订阅一个topic,也就是指定消费哪一个topic
		consumer.subscribe(KafkaUtilTest.getTopics());
		while(true){
			// 批量提交数量
			ConsumerRecords<String, String> records = consumer.poll(1000);//订阅之后,我们再从kafka中拉取数据
			for (ConsumerRecord<String, String> record : records){//一般来说进行消费会使用监听,这里我们就用for(;;)来进行监听
				System.out.println("接收的消息:"+ record.value() + " offset="+record.offset());
			}
			//手动提交offset记录
			consumer.commitSync();
		}
	}
	
}

之后运行代码!当然要先保证kafka服务器已启动!

运行Producer端代码生产数据、发送数据

搭建kafka开发环境
运行Consumer端代码接收Producer端发送的消息

搭建kafka开发环境

到这里,简单的java调用kafka操作就完成了。

 

总的来说,简单的开发一个kafka的程序需要以下步骤:

  1. 成功搭建kafka服务器,并成功启动!
  2. 得到kafka服务信息,然后在代码中进行相应的配置。
  3. 配置完成之后,监听kafka中的消息队列是否有消息产生。
  4. 将产生的数据进行业务逻辑处理!

 

  • Producer :消息生产者,向broker发消息的客户端。
  • Consumer :消息消费者,向broker取消息的客户端
  • Topic :一个队列,主题。
  • Message:消息是kafka处理的对象,在kafka中,消息是被发布到broker的topic中。而consumer也是从相应的topic中拿数据。也就是说,message是按topic存储的
  • Consumer Group :将topic消息的广播发给consumer的手段。一个topic可以有多个CG。
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

 

在此过程中遇到的疑问???

刚开始运行producer端代码发送100条消息,运行consumer端代码同样可以接收到100条消息。但后续测试过程中有遇到producer端发送了100条消息,consumer端却没完整接收到100条,而只是接收到了部分20多条、60条等,具体原因多方咨询查找仍尚未明确,后续若明确会更新文章。

 

参考文章:https://blog.csdn.net/qazwsxpcm/article/details/79186668

相关文章: