【发布时间】:2020-01-08 13:55:02
【问题描述】:
我们有如下来源,我们使用的是 spring cloud stream rabbit binder 3.0.1.RELEASE。
@Component
public class Handlers {
private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();
public void emitData(String str){
sourceGenerator.onNext(str);
}
@Bean
public Supplier<Flux<String>> generate() {
return () -> sourceGenerator;
}
@Bean
public Function<String, String> process() {
return str -> str.toUpperCase();
}
}
application.yml
spring:
profiles: dev
cloud:
stream:
function:
definition: generate;process
bindings:
generate-out-0: source1
process-in-0: source1
process-out-0: processed
bindingServiceProperties:
defaultBinder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
在调用emitData 方法时,我们没有看到 RabbitMQ 队列中的数据。
我们还观察到消费者绑定正在发挥作用。我们通过 RabbitMQ Admin 将消息直接发送到消费者链接队列进行检查。但是供应商绑定不起作用。
此外,我们观察到没有Flux 的Supplier 在相同的application.yml 配置下也能正常工作。
我们这里是否缺少任何配置?
即使使用 TestChannelBinderConfiguration 的测试用例也可以正常工作,如下所示。
@Slf4j
@TestPropertySource(
properties = {"spring.cloud.function.definition = generate|process"}
)
public class HandlersTest extends AbstractTest {
@Autowired
private OutputDestination outputDestination;
@Test
public void testGeneratorAndProcessor() {
final String testStr = "test";
handlers.emitData(testStr);
Object eventObj;
final Message<byte[]> message = outputDestination.receive(1000);
assertNotNull(message, "processing timeout");
eventObj = message.getPayload();
assertEquals(new String((byte[]) eventObj), testStr.toUpperCase());
}
}
【问题讨论】:
-
如何调用
emitData?具体来说,您如何获得对Handlers的引用? -
处理程序被用作来自其他类的普通自动装配 bean 来调用 `emitdata' 方法。
标签: spring-cloud-stream spring-rabbit