总览
该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序。
在本教程结束时,您将运行一个简单的基于Spring Boot的Greetings微服务
- 从REST API获取消息
- 将其写入Kafka主题
- 从主题中读取
- 将其输出到控制台
让我们开始吧!
顺便说一句,您可以在此处找到源代码。
什么是Spring Cloud Streaming?
Spring Cloud Stream是基于Spring Boot构建的框架,用于构建消息驱动的微服务。
什么是卡夫卡?
Kafka是最初由LinkedIn开发的流行的高性能和水平可伸缩的消息传递平台。
安装Kafka
从这里下载Kafka并将其解压缩:
> tar -xzf kafka_2.11-1.0.0.tgz > cd kafka_2.11-1.0.0
启动Zookeeper和Kafka
在Windows上:
> bin\windows\zookeeper-server-start.bat config\zookeeper.properties > bin\windows\kafka-server-start.bat config\server.properties
在Linux或Mac上:
> bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties
如果计算机从休眠状态唤醒后,Kafka没有运行并且无法启动,请删除<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。
什么是Lombok?
Lombok是一个Java框架,可在代码中自动生成getter,setter,toString(),构建器,记录器等。
Maven依赖
转到https://start.spring.io创建一个Maven项目:
- 添加必要的依赖项:
Spring Cloud Stream,Kafka,Devtools(用于在开发过程中进行热重新部署,可选),Actuator(用于监视应用程序,可选),Lombok(确保在IDE中也安装了Lombok插件) - 单击生成项目按钮以zip文件形式下载项目
- 解压缩zip文件并将maven项目导入到您喜欢的IDE
注意pom.xml文件中的Maven依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- Also install the Lombok plugin in your IDE -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
…还有<dependencyManagement>部分:
<dependencyManagement>
<dependencies>
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
…和<repository>部分:
<repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>http://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository>
定义卡夫卡流
package com.kaviddiss.streamkafka.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface GreetingsStreams {
String INPUT = "greetings-in";
String OUTPUT = "greetings-out";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}
为了使我们的应用程序能够与Kafka进行通信,我们需要定义一个出站流以将消息写入Kafka主题,并定义一个入站流以读取来自Kafka主题的消息。
通过简单地创建一个接口为每个流定义单独的方法,Spring Cloud提供了一种方便的方法。
inboundGreetings()方法定义要从Kafka读取的入站流,而outboundGreetings()方法定义要写入Kafka的出站流。
在运行时,Spring将为GreetingsStreams接口创建一个基于Java代理的实现,该实现可以作为Spring Bean注入到代码中的任何位置,以访问我们的两个流。
配置Spring Cloud Stream
下一步是将Spring Cloud Stream配置为绑定到GreetingsStreams接口中的流。 这可以通过使用以下代码创建@Configuration类com.kaviddiss.streamkafka.config.StreamsConfig来完成:
package com.kaviddiss.streamkafka.config;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}
使用@EnableBinding批注(将GreatingsService接口传递到该批注)完成@EnableBinding的GreatingsService 。
Kafka的配置属性
默认情况下,配置属性存储在src/main/resources/application.properties文件中。
但是,我更喜欢使用YAML格式,因为它不太冗长,并且允许将公共属性和特定于环境的属性保留在同一文件中。
现在,让我们将application.properties重命名为application.yaml并将config片段下方粘贴到文件中:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json
上面的配置属性配置要连接的Kafka服务器的地址,以及我们用于代码中的入站和出站流的Kafka主题。 他们俩都必须使用相同的Kafka主题!
contentType属性告诉Spring Cloud Stream在流中以String的形式发送/接收我们的消息对象。
创建消息对象
使用下面的代码创建一个简单的com.kaviddiss.streamkafka.model.Greetings类,该代码将表示我们从中读取并写入的greetings Kafka主题:
package com.kaviddiss.streamkafka.model;
// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter @Setter @ToString @Builder
public class Greetings {
private long timestamp;
private String message;
}
注意,由于Lombok批注,该类如何没有任何getter和setter。 @ToString将使用类的字段生成toString()方法,而@Builder批注将允许我们使用流畅的生成器创建Greetings对象(请参见下文)。
创建服务层以写入Kafka
让我们创建的com.kaviddiss.streamkafka.service.GreetingsService下面的代码,将写一个类Greetings对象的greetings卡夫卡话题:
package com.kaviddiss.streamkafka.service;
import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
@Service
@Slf4j
public class GreetingsService {
private final GreetingsStreams greetingsStreams;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greetings greetings) {
log.info("Sending greetings {}", greetings);
MessageChannel messageChannel = greetingsStreams.outboundGreetings();
messageChannel.send(MessageBuilder
.withPayload(greetings)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
@Service批注会将此类配置为Spring Bean,并通过构造函数注入GreetingsService依赖项。
@Slf4j批注将生成一个SLF4J记录器字段,可用于记录日志。
在sendGreeting()方法中,我们使用注入的GreetingsStream对象发送由Greetings对象表示的消息。
创建REST API
现在,我们将创建一个REST api端点,该端点将触发使用GreetingsService Spring Bean向Kafka发送消息:
package com.kaviddiss.streamkafka.web;
import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GreetingsController {
private final GreetingsService greetingsService;
public GreetingsController(GreetingsService greetingsService) {
this.greetingsService = greetingsService;
}
@GetMapping("/greetings")
@ResponseStatus(HttpStatus.ACCEPTED)
public void greetings(@RequestParam("message") String message) {
Greetings greetings = Greetings.builder()
.message(message)
.timestamp(System.currentTimeMillis())
.build();
greetingsService.sendGreeting(greetings);
}
}
@RestController注释告诉Spring这是一个Controller bean(MVC中的C)。 greetings()方法定义一个HTTP GET /greetings端点,该端点接受message请求参数,并将其传递给GreetingsService的sendGreeting()方法。
听问候卡夫卡主题
让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类,该类将侦听greetings Kafka主题上的消息并将其记录在控制台上:
package com.kaviddiss.streamkafka.service;
import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class GreetingsListener {
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
log.info("Received greetings: {}", greetings);
}
}
@Component批注类似于@Service @Component , @Service @RestController定义了一个Spring Bean。
GreetingsListener有一个方法, handleGreetings()将通过云春流与每一个新的调用Greetings的消息对象greetings卡夫卡的话题。 这要感谢为handleGreetings()方法配置的@StreamListener批注。
运行应用程序
最后一个难题是由Spring Initializer自动生成的com.kaviddiss.streamkafka.StreamKafkaApplication类:
package com.kaviddiss.streamkafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(StreamKafkaApplication.class, args);
}
}
无需在此处进行任何更改。 您可以在您的IDE中将此类作为Java应用程序运行,也可以使用Spring Boot maven插件从命令行运行该应用程序:
> mvn spring-boot:run
应用程序运行后,在浏览器中转到http:// localhost:8080 / greetings?message = hello并检查您的控制台。
摘要
我希望您喜欢本教程。 随时提出任何问题并留下您的反馈。
翻译自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html