【发布时间】:2017-11-04 08:35:12
【问题描述】:
我有两个应用程序,如下所述:
- Spring 启动应用程序 - 充当休息端点,将请求发布到消息队列。 (阿帕奇脉冲星)
- Heron (Storm) 拓扑 - 处理从消息队列 (PULSAR) 接收的消息并具有所有处理逻辑。
我的要求,我需要通过 Spring Boot 应用程序服务不同的用户查询,该应用程序将该查询发送到消息队列,并在 spout 处使用。一旦 spout 和 bolts 处理了请求,就会再次从 bolt 发布消息。来自 Bolt 的响应在 Spring 启动(消费者)时处理并回复用户请求。典型如下图:
为了服务于同一个请求,我现在在内存中缓存延迟结果对象(我为发送到拓扑的每条消息设置了一个 reqID,并且我还在内存中维护了一个键、值对),当消息到达时我解析请求 id 并将结果设置为 deferedResult(我知道这是一个糟糕的设计,应该如何解决这个问题?)。
在这种情况下,从拓扑接收到的消息的顺序不是连续的(因为每个正在处理的请求都需要自己的时间,并且生产者 Bolt 会像 on当它收到一个)。
我有点坚持这种设计,无法继续前进。
//Controller
public DeferredResult<ResponseEntity<?>> process(//someinput) {
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>(config.getTimeout());
CompletableFuture<String> serviceResponse = service.processAsync(inputSource);
serviceResponse.whenComplete((response, exception) -> {
if (!ObjectUtils.isEmpty(exception))
result.setErrorResult(//error);
else
result.setResult(//complete);
});
return result;
}
//In Service
public CompletableFuture processAsync(//input){
producer.send(input);
CompletableFuture result = new CompletableFuture();
//consumer has a listener as shown below
// **I want to avoid below line, how can I redesign this**
map.put(id, result);
return result;
}
//in same service, a listener is present for consumer for reading the messages
consumerListener(Message msg){
int reqID = msg.getRequestID();
map.get(reqID).complete(msg.getData);
}
如上所示,一旦我收到消息,我就会得到 completableFuture 对象并设置结果,它内部调用延迟结果 对象并将响应返回给用户。
【问题讨论】:
-
我不确定为什么您的问题针对的是 REST,因为您正在寻找一些内部解决方案。接下来,您可能会考虑将消息队列的调用封装到 Spring 引导应用程序中自己的服务中,该服务由 Spring cache 管理。这里 spring 缓存将使用 AOP 来缓存基于缓存键的方法调用(默认情况下调用的方法名称 + 方法参数),并在后续调用中使用此信息来决定是返回先前缓存的值还是发送新消息到队列和检索它的响应
-
嗨@RomanVottner,感谢您的回复,但我添加了代码以便更好地理解。你能看看吗。由于我必须为用户提供相同的请求,因此我缓存了 DeferedResult 对象并在我从其他应用程序获取相应消息时调用它。
-
你以前看过RMQ RPC(Remote Procedure Call)吗?它为每个请求创建一个临时响应队列,以便消费者可以顺序消费消息并异步产生响应消息。请参考Remote procedure call (RPC)。
标签: rest spring-boot apache-storm mq