【发布时间】:2019-12-09 07:45:57
【问题描述】:
我无法弄清楚如何测试使用 Avro 作为消息格式和(Confluent)模式注册表的 Spring Cloud Stream Kafka Streams 应用程序。
配置可能是这样的:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
注意事项:
- 它正在使用本机序列化/反序列化。
- 测试框架:Junit 5
我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但正如您所见,它还依赖于应该以某种方式模拟的 Schema Registry。怎么样?
【问题讨论】:
-
我希望有一种 Spring Cloud Stream 方式来模拟 Schema Registry。无论如何我都会试一试。
-
我看到的一个问题是,虽然应用程序在某些 bean 定义点 (
@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String) 需要一个带有模式注册表 url 的配置属性,但这个库在运行时提供了它this.getSchemaRegistryUrl() -
如何加载引导服务器?你能类似地定义注册地址吗?
-
我想我可以设置
spring.cloud.stream.schema-registry-client.endpoint属性,就像这里的 boostrap 服务器一样:github.com/spring-cloud/spring-cloud-stream-samples/blob/master/…
标签: apache-kafka spring-cloud apache-kafka-streams spring-kafka spring-cloud-stream