【问题标题】:Implementing Avro in Spring Cloud Dataflow在 Spring Cloud Dataflow 中实现 Avro
【发布时间】:2018-05-03 20:04:17
【问题描述】:

我正在尝试实现一个 Spring Cloud Dataflow 流,它从数据库中读取记录,将它们传递给一个处理器,该处理器转换为 Avro 模式,然后将其传递给接收器应用程序使用。

我的数据从 SQL DB 流向我的源应用程序,并通过 Kafka 活页夹传递数据没有问题,因为我在将数据从处理器发送到接收器应用程序序列化/反序列化时遇到问题阿夫罗。

我创建了一个名为 ech.avsc 的 avro 模式,并使用处理器中的 avro-maven-plugin 为其生成了一个名为 EchRecord 的类。

我在处理器和接收器的 pom 中添加了以下依赖项

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-schema</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

我已将处理器的属性设置为

spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

在 Sink 端,属性看起来像 spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

处理器应用程序代码如下所示:

@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);

public static void main(String[] args) {
    SpringApplication.run(EchProcessorApplication.class, args);
}


@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
    return EchRecord.newBuilder()
            .setCallId(11111).build();;
}

在 Sink 端,代码如下所示:

@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {



    private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(AvroLoggerApplication.class, args);
    }


    @StreamListener(Sink.INPUT)
    public void logHandler(Object data) {

        LOGGER.info("data='{}'", data.toString());
        LOGGER.info("class='{}'", data.getClass());


    }
}

我有一个 Spring Schema Registry Server 正在运行并且两个应用程序都可以访问它,并且我可以在查询注册表时看到模式已交付给服务器。

我可以查看是否在接收器应用程序上启用调试日志记录,表明在收到的消息上正确设置了 contentType:contentType=application/vnd.echrecord.v1+avro

在 Sink 应用程序中,我设置了一个带有 @StreamListener 注释的方法来检索接收对象的消息并打印出数据和类类型,它似乎正在检索一个字节数组。

如何更改 Sink 应用程序的代码以将 Avro 消息反序列化为可以从中检索设置数据的内容?

【问题讨论】:

  • 您能否提供一个小型示例应用程序(源和接收器),我们可以在其中重现该问题?无需使用任何数据库源,只需使用 Avro 序列化基本源和反序列化它的消费者。这样,更容易排除故障。

标签: avro spring-cloud-stream spring-cloud-dataflow


【解决方案1】:

这里有几件事可以尝试。在生产方面,由于您的类型已经是 Avro 类型(SpecificRecord 或 GenericRecord),因此您不需要 dynamicSchemaGeneration 标志,这适用于基于反射的编写器,主要用于测试,因为它会影响性能。

由于您的接收器可以看到您发布的正确类型,因此您现在需要将您的类型放在接收器上。因此,例如在接收器上添加类型并使用正确的类型注释方法:EchRecord 这将为您提供正确的类型。

您也可以将其设置为GenericRecord,以便能够像使用record.get(&lt;propertyname&gt;) 的对象容器一样访问它

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-10
    • 2017-10-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多