【问题标题】:Supplier binding is not working with spring cloud stream rabbit供应商绑定不适用于 spring cloud stream rabbit
【发布时间】:2020-01-08 13:55:02
【问题描述】:

我们有如下来源,我们使用的是 spring cloud stream rabbit binder 3.0.1.RELEASE。

@Component
public class Handlers {

  private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

  public void emitData(String str){
    sourceGenerator.onNext(str);
  }

  @Bean
  public Supplier<Flux<String>> generate() {
    return () -> sourceGenerator;
  }

  @Bean
  public Function<String, String> process() {
    return str -> str.toUpperCase();
  }


}

application.yml

spring:
  profiles: dev
  cloud:
    stream:
      function:
        definition: generate;process
        bindings:
          generate-out-0: source1
          process-in-0: source1
          process-out-0: processed

        bindingServiceProperties:
          defaultBinder: local_rabbit

      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: / 

在调用emitData 方法时,我们没有看到 RabbitMQ 队列中的数据。 我们还观察到消费者绑定正在发挥作用。我们通过 RabbitMQ Admin 将消息直接发送到消费者链接队列进行检查。但是供应商绑定不起作用。

此外,我们观察到没有FluxSupplier 在相同的application.yml 配置下也能正常工作。 我们这里是否缺少任何配置?

即使使用 TestChannelBinderConfiguration 的测试用例也可以正常工作,如下所示。

@Slf4j
@TestPropertySource(
        properties = {"spring.cloud.function.definition = generate|process"}
)
public class HandlersTest extends AbstractTest {
  @Autowired
  private OutputDestination outputDestination;

  @Test
  public void testGeneratorAndProcessor() {
      final String testStr = "test"; 
      handlers.emitData(testStr);

      Object eventObj;
      final Message<byte[]> message = outputDestination.receive(1000);

      assertNotNull(message, "processing timeout");
      eventObj = message.getPayload();

      assertEquals(new String((byte[]) eventObj), testStr.toUpperCase());
  }
}

【问题讨论】:

  • 如何调用emitData?具体来说,您如何获得对Handlers 的引用?
  • 处理程序被用作来自其他类的普通自动装配 bean 来调用 `emitdata' 方法。

标签: spring-cloud-stream spring-rabbit


【解决方案1】:

当你说we are not seeing data in RabbitMQ queue. . .。你说的是哪个队列?使用 AMQP 时,消息被发送到exchanges,如果这种交换未绑定到任何queue,则消息被丢弃,因此我的问题。你真的把generate-out-0exchange 绑定到队列了吗?

无论如何,我只是对其进行了测试,一切都按预期工作。这是完整的代码。

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = SpringApplication.run(SimpleStreamApplication.class);
        SimpleStreamApplication app = context.getBean(SimpleStreamApplication.class);
        app.emitData("Hello");
    }

    private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

    public void emitData(String str) {
        sourceGenerator.onNext(str);
    }

    @Bean
    public Supplier<Flux<String>> generate() {
        return () -> sourceGenerator;
    }
}

【讨论】:

  • 我们有一个指向该供应商的处理器链接...刚刚添加了有问题的代码。我们没有在处理器中获取数据。当我们有没有 Flux 的供应商时,它工作得很好。 public Supplier&lt;String&gt; generate() { ... }
  • 我不确定我是否遵循。 processor link to this supplier. 是什么意思?无论如何,我无法重现您所描述的内容。事实上,我看到了相反的情况 - Rabbit 队列中的一条消息绑定到generate-out-0 交换。所以也许你可以在 github 上的某个地方发布一个完整的项目,我们可以看看
  • @OlegZhurakousky,我们期望上面的配置 `bindings: generate-out-0: source1 process-in-0: source1 process-out-0: processed ` 链接供应商和处理器source1 仅限队列。在这种情况下,我们没有generate-out-0。当我们在单元测试中指定spring.cloud.function.definition = generate|process 时,数据传输工作正常。但是,当我们将它作为启动应用程序运行并在 RabbitMQ Admin 处监视 source1 队列的状态时,该队列中没有任何内容。
  • 您之前从未提到过generate|process。现在(编辑后)我看到generate;process 不一样。所以,你这边似乎发生了很多事情,这让这篇文章的读者感到困惑。因此,正如我之前建议的那样,请将可重现的示例项目推送到 github 上的某个地方,否则我不确定这里还能说什么。我真的不明白你想回答什么问题。
  • 另外,如果你确实使用函数组合作为generate|process,为什么你有generate-out-0: source1, process-in-0: source1, process-out-0: processed而不是generateprocess-out-0:source1?请查看cloud.spring.io/spring-cloud-static/spring-cloud-stream/…部分的详细描述。
【解决方案2】:

虽然我很感谢您发布一个项目,但不幸的是您的故事仍在不断变化,我仍然不确定您想要完成什么。所以这是我的最后一个回复,但我会尽量详细和提供信息,所以这是我从你的项目中看到的。

  1. 您的配置有问题。函数的 definition 属性应为 spring.cloud.function.definition

。 . .

spring:
  cloud:
    function:
       definition: generate;process;sink

。 . .

  1. 由于您使用的是;,我假设您希望按照multiple binding 部分中的说明独立绑定所有 3 个函数(无函数组合)。

  2. spring.cloud.stream.function.bindings 是一个属性,允许您将生成的绑定名称映射到自定义绑定名称,如Function Binding Names 中所述。它与实际目的地的名称无关。为此,我们有 destination 属性,它也包含在引用的部分中(例如,--spring.cloud.stream.bindings.generate-out-0.destination=source1)。但是,如果不使用 destination 属性,则假定绑定名称和目标名称相同。 但是,消费者目的地也需要组名,如果没有提供,它会生成一个。因此,根据您的配置,您的 generate-out-0 供应商将绑定到 source1 exchange

另一方面process-in-in函数的输入绑定到source1.anonymous...队列

正如我之前所说,source1 exchangesource1.anonymous... queue 之间没有 RabbitMQ 绑定,因此发送到 source1 的消息交换 被简单地删除。通过创建此类绑定(例如,通过 Rabbit MQ 控制台),消息将到达消费者。

也就是说,这样的设计是非常低效的。为什么要在同一进程空间 (JVM) 中发送到同一目的地接收?当您可以简单地通过引用传递时,为什么还要滥用网络?因此,至少将 definition 更改为 spring.cloud.function.definition=generate|process|sink`。 更好的解决方案是在供应商本身中编写您的代码

public void emitData(String str) {
    String uppercased = str.toUpperCase();
    sourceGenerator.onNext(uppercased);
    System.out.println("Emitted: " + str);
}

并完成它。 无论如何,我强烈建议您仔细阅读我们的用户指南,特别是 Main Concepts 部分和 Programming Model 部分,因为我相信您误解了某些核心概念,我认为这会导致您的帖子和问题中的不一致。

【讨论】:

  • spring.cloud.function.definitionspring.cloud.stream.function.definition 之间的区别真的不明显。这些都在文档中出现在不同的地方,但从不在同一个地方......但是好的,我们改变了它。完整的回复在下面的单独答案中。
【解决方案3】:

我们对代码进行了一些更改。但问题仍然存在。供应商的助焊剂实施不起作用。非助焊剂供应商工作正常:


    @Bean
    public Supplier<Flux<String>> generate_flux() {
        return () -> sourceGenerator;
    }

    @Bean
    public Supplier<Message<?>> generate_non_flux() {
        return MessageBuilder
           .withPayload("Non flux emitter: " + LocalDateTime.now().toString())::build;
    }

完整来源是in the same place

我们还按照您的建议更改了application.yml,并进行了一些实验。感谢您对主题含义的解释。但我们也检查并可以说 RabbitMQ 自动将输出和消费者链接到相同的目的地和任何指定的组名。它适用于明确指定的组和随机生成的组。这不是关于并行处理,而是关于 RabbitMQ 链接它的能力。

generate_fluxgenerate_non_flux 都连接到同一个输出目标:

      bindings:
        generate_flux-out-0:
          destination: source
        generate_non_flux-out-0:
          destination: source

现在应用程序的输出是:

Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:51.721094
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:52.725961
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:53.727054
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:54.727898
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801

有带有NON FLUX 的已处理消息,但没有通量消息。

因此,非通量发射器可以正常工作,但我们不能根据请求使用它来发射。供应商的通量实施不起作用。从那以后我们就开始了,我们没有对任务的描述做任何更改。

谈到我们将代码拆分为供应商、处理器和接收器,我们谈论的是不同类型的机器。 supplier - 这是生成数据的遗留代码。 processor 是工作流的内存消耗部分,我们希望将其保存在一组单独的 VM 上,并能够在 Kubernetes 中对其进行扩展。 sink 在我们的例子中是一台特定的机器,它将数据存储到数据库中。同时,由于遗留代码,我们希望应用程序有通用代码,而不是将其拆分为单独的应用程序,如基于 Apache Beam 的应用程序。

【讨论】:

  • 您的供应商(助焊剂或其他)工作正常。我已经克隆了您的项目并运行了几次。您需要做的就是在 Rabbit 交换和队列之间创建一个绑定(我在之前的回复中已经多次提到它)。此外,你的故事还在不断变化。您现在正在谈论独立的源/处理器/接收器。如果这是您想要的,那么为什么要将它们放在单个应用程序上下文中?通过这样的拆分,你能获得什么价值?
  • 您确定基于助焊剂的供应商正在为您服务吗?您可以看到,在我们的输出中,标记Consumed: 仅用于非通量。如果我们评论非助焊剂供应商,则没有任何消耗。我们还检查了rabbitmq 显示sourcesource.processors 绑定此外,我们从未改变过故事。我们从不工作的基于助焊剂的供应商开始,只是为您提供了完整的示例工作流程。示例代码只是一个说明。
  • 关于single application context,不确定是否理解你。如果是关于单元测试,我们希望避免任何外部队列代理。我们知道正确的工作流程,我们希望在本地运行它。在任何情况下,我们都需要使用不同的代理进行测试、开发和生产。
  • 是的,我的供应商正在工作,我可以看到队列中的所有 5 条消息 - 我在交换机(供应商发送消息的位置)和队列(消费者接收消息的位置)之间设置了一个绑定。
  • 但这怎么可能呢?配置中的generate_flux-out-0 连接到与generate_non_flux-out-0 相同的目标。我们正在使用具有默认设置的 RabbitMQ,并在具有不同操作系统的两台不同 PC 上对其进行了测试。你可以看到我们只得到了Consumed: NON FLUX EMITTER,没有任何通量生成......我们还检查了交换和输出队列的 RabbitMQ 绑定。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-03-24
  • 2017-12-25
  • 1970-01-01
  • 2020-10-21
相关资源
最近更新 更多