【发布时间】:2018-05-03 20:04:17
【问题描述】:
我正在尝试实现一个 Spring Cloud Dataflow 流,它从数据库中读取记录,将它们传递给一个处理器,该处理器转换为 Avro 模式,然后将其传递给接收器应用程序使用。
我的数据从 SQL DB 流向我的源应用程序,并通过 Kafka 活页夹传递数据没有问题,因为我在将数据从处理器发送到接收器应用程序序列化/反序列化时遇到问题阿夫罗。
我创建了一个名为 ech.avsc 的 avro 模式,并使用处理器中的 avro-maven-plugin 为其生成了一个名为 EchRecord 的类。
我在处理器和接收器的 pom 中添加了以下依赖项
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
我已将处理器的属性设置为
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
在 Sink 端,属性看起来像
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
处理器应用程序代码如下所示:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
在 Sink 端,代码如下所示:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
我有一个 Spring Schema Registry Server 正在运行并且两个应用程序都可以访问它,并且我可以在查询注册表时看到模式已交付给服务器。
我可以查看是否在接收器应用程序上启用调试日志记录,表明在收到的消息上正确设置了 contentType:contentType=application/vnd.echrecord.v1+avro
在 Sink 应用程序中,我设置了一个带有 @StreamListener 注释的方法来检索接收对象的消息并打印出数据和类类型,它似乎正在检索一个字节数组。
如何更改 Sink 应用程序的代码以将 Avro 消息反序列化为可以从中检索设置数据的内容?
【问题讨论】:
-
您能否提供一个小型示例应用程序(源和接收器),我们可以在其中重现该问题?无需使用任何数据库源,只需使用 Avro 序列化基本源和反序列化它的消费者。这样,更容易排除故障。
标签: avro spring-cloud-stream spring-cloud-dataflow