【问题标题】:Can't change content_type from application/octet-stream to application/json in RabbitMQ sink无法在 RabbitMQ 接收器中将 content_type 从 application/octet-stream 更改为 application/json
【发布时间】:2019-04-10 12:31:48
【问题描述】:

我有一个简单的 RabbitMQ 源和接收器。我正在向具有以下属性的源队列发布消息:

content_type -> application/json

和一个 JSON 负载:

{
  "userId": 2,
  "customerId": 1,
}

RabbitMQ 接收器使用 application/octet-stream 而不是 JSON 获取消息。

我尝试使用以下属性启动应用程序:

spring.cloud.stream.default.contentType=application/json

但它没有帮助。

流定义:

stream_1=rabbitSource: rabbit --queues=queue1 --password=p --host=h --username=u | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=h --username=u

如何将内容类型设置为application/jsonreference guide 似乎没有答案。

发布版本:

  • spring-cloud-dataflow-server:2.0.1.RELEASE
  • spring-cloud-skipper-server:2.0.0.RELEASE

更新:

正如@SabbyAnandan 的回答中所建议的,我现在正在跑步:

dataflow:>stream create --name test123 --definition "rabbitSource: rabbit --queues=queue --password=p --host=rmq --username=u --spring.cloud.stream.bindings.output.contentType='application/json' | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=rmq --username=p"
Created new stream 'test123123'

dataflow:>stream deploy --name test123 --properties "app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'"
Deployment request has been sent for stream 'test123'

content_type 还是一样的。

【问题讨论】:

  • 嗨。在描述已发布应用程序的问题时,如果您还可以列出正在使用的版本,将会很有帮助。你能用版本编辑帖子吗?
  • @SabbyAnandan 当然,会编辑。这是我在 GitHub 上打开的问题的link
  • 请更新正在使用的应用版本。这是一个特定于应用程序的讨论,而不是 SCDF。内容类型协商在 Spring Cloud Stream 的最新版本和我们发布的应用程序的相应版本中发生了显着变化,所以让我们从版本开始。
  • @SabbyAnandan 我使用的是最新版本。如果我理解正确,我会从here 获取应用程序。

标签: java rabbitmq spring-cloud-stream spring-cloud-dataflow


【解决方案1】:

@Maroun,如前所述,这里有一些选项。

  1. 兔子接收器始终以byte[] 接收数据。您可以通过提供配置(--rabbit.converterBeanName=jsonConverter)来强制它使用 json 转换器。但默认情况下使用 Base64 编码器,生成的文本将采用 Base64 编码。然后在自定义应用程序中,它从接收器发布到的交换器接收数据,您需要提供一个自定义转换器,该转换器使用 Base64 解码器进行属性解码。
  2. 您的另一个选择是修补 Rabbit 水槽以满足您的需要。克隆Rabbit app starter repo,然后提供一个PassthroughConverter,它只是将传入的byte[] 传递到下游。您还需要以某种方式更改 Message 标头上的内容类型(更改为 application/json)。然后普通的Jackson 转换器将在自定义应用程序端工作,因为在这种情况下byte[] 不是 Base64 编码的。

【讨论】:

  • 非常感谢您的帮助和时间,非常感谢您的努力。
【解决方案2】:

在 Rabbit-source 的标头中设置的默认内容类型是:content-type: application/x-java-serialized-object

您可以通过传递来覆盖该行为:

1) --spring.cloud.stream.bindings.output.contentType='application/json' 作为 Rabbit 源的内联属性。

2) 在部署流时,您可以通过--properties="app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json' 覆盖应用程序特定的属性。

部署后,您可以通过转到特定于应用程序的http://<APP-HOST>:<APP-PORT?/actuator/configprops 端点来确认该属性是否被覆盖。

要解决所有这些问题,您可以独立运行应用程序(即java -jar..)以确认行为,然后再通过 SCDF 运行它们。如果它独立工作,它也应该在 SCDF 中工作。

【讨论】:

  • 谢谢。如果我希望此行为成为默认行为,是否可以在 docker-compose 设置中将其添加为 dataflow-server 容器(即spring.cloud.stream.bindings.output.contentType=application/json)的环境变量?
  • 您是否在本地试用过这些应用程序?暂时忽略 SCDF、Docker 等。如果您在每个应用程序中独立覆盖该属性并将其作为java -jar 运行,它是否按预期工作?让我们确保这些应用能够满足您的需求。
  • 另一件事。由于您将 RabbitMQ 作为源和接收器,因此您将使用不同的标签来部署流。同样,您需要在部署时覆盖属性时使用标签。那将是:app. rabbitSource.spring.cloud.stream.bindings.output.contentType='application/json' 在您的情况下。更多详细信息请参阅文档here
  • 您是否尝试过分别通过java -jar rabbit-source.jar --pring.cloud.stream.bindings.output.contentType='application/json'java -jar rabbit-sink.jar --spring.cloud.stream.bindings.input.contentType='application/json' 分别为源应用程序和接收器应用程序启动这两个应用程序?当它们启动和运行时,您是否看到这些属性覆盖通过应用程序的执行器端点正确填充?
  • curl -c cookies -d "username=user&password=spring_auto_generated_pwd" localhost:8080/actuator/configprop 不返回任何内容。
猜你喜欢
  • 2011-10-18
  • 1970-01-01
  • 1970-01-01
  • 2014-06-08
  • 1970-01-01
  • 2010-12-28
  • 2014-11-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多