【问题标题】:Spring cloud stream IntegrationFlow with Rabbitmq messaging, The consumer giving ASCII numbers as message payloadSpring Cloud Stream IntegrationFlow with Rabbitmq 消息传递,消费者提供 ASCII 数字作为消息有效负载
【发布时间】:2018-10-09 14:48:56
【问题描述】:

我正在使用 Spring Cloud Stream 进行消息传递。在消费者部分,我使用了 IntegrationFlow 来监听队列。它正在收听并打印来自生产者端的消息。但是格式不同,这就是我现在面临的问题。生产者的内容类型是 application/json 和显示 ASCII 数字的 IntegrationFlow 消息有效负载。下面给出了为消费者编写的代码

 @EnableBinding(UserOperationConsume.class)
 public class ConsumerController {

   @Bean
   IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
     return IntegrationFlows
     .from(u.userRegistraionProduces())
     //.transform(Transformers.toJson()) // not working as expected
     //.transform(Transformers.fromJson(UserDTO.class))
     .handle(String.class, (payload, headers) -> {
     System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118... 
     return null;
     }).get();
  }

 }

输入界面是,

 public interface UserOperationConsume {
  @Input
  public SubscribableChannel userRegistraionProduces();
 }

而消费者的yml配置是,

 server:
   port: 8181

 spring:
   application:
   name: nets-alert-service
 ---
 spring:
   cloud:
     config:
       name: notification-service
       uri: http://localhost:8888
 ---    
 spring:
   rabbitmq:
     host: localhost
     port: 5672
     username: guest
     password: guest

   ---
   spring:
     cloud:
       stream:
         bindings:
           userRegistraionProduces:
             destination: userOperations
         input:
           content-type: application/json

我尝试过 Sink.class 绑定,那时我从队列中得到了准确的消息。因此,如果此 IntegrationFlow 配置有任何错误,请告诉我。因为我是spring cloud stream和IntegrationFlow的新手。有没有办法把这个ascii转换成精确的字符串? 提前致谢

【问题讨论】:

    标签: spring-boot rabbitmq ascii spring-cloud spring-cloud-stream


    【解决方案1】:

    使用IntegrationFlows.from(channel) 不会提供任何转换提示,因此您只需获取原始的byte[] 有效负载(包含JSON)。不清楚你为什么要使用toJson() 转换器。

    您的.handle(String.class, (payload, headers) -> {... 导致使用简单的ArrayToStringConverter,这就是您看到每个字节值的原因。

    无论如何,您都没有正确使用该框架。使用...

    @StreamListener("userRegistraionProduces")
    public void listen(UserDTO dto) {
        System.out.println(dto);
    }
    

    ...框架会为您处理转换。或者……

    @StreamListener("userRegistraionProduces")
    public void listen(Message<UserDTO> dtoMessage) {
        System.out.println(dtoMessage);
    }
    

    如果您的制作人在标题中传达了其他信息。

    编辑

    如果您更喜欢自己进行转换,这可以正常工作...

    @Bean
    IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
        return IntegrationFlows.from(u.userRegistraionProduces())
                .transform(Transformers.fromJson(UserDTO.class))
                .handle((payload, headers) -> {
                    System.out.println(payload.toString());
                    return null;
                }).get();
    }
    

    ...因为 Json 转换器可以读取 byte[]

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-28
      • 2018-01-04
      • 2018-05-13
      • 2020-09-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多