【问题标题】:Handling REST request with Message Queue使用消息队列处理 REST 请求
【发布时间】:2017-11-04 08:35:12
【问题描述】:

我有两个应用程序,如下所述:

  1. Spring 启动应用程序 - 充当休息端点,将请求发布到消息队列。 (阿帕奇脉冲星)
  2. Heron (Storm) 拓扑 - 处理从消息队列 (PULSAR) 接收的消息并具有所有处理逻辑。

我的要求,我需要通过 Spring Boot 应用程序服务不同的用户查询,该应用程序将该查询发送到消息队列,并在 spout 处使用。一旦 spout 和 bolts 处理了请求,就会再次从 bolt 发布消息。来自 Bolt 的响应在 Spring 启动(消费者)时处理并回复用户请求。典型如下图:

为了服务于同一个请求,我现在在内存中缓存延迟结果对象(我为发送到拓扑的每条消息设置了一个 reqID,并且我还在内存中维护了一个键、值对),当消息到达时我解析请求 id 并将结果设置为 deferedResult(我知道这是一个糟糕的设计,应该如何解决这个问题?)。

在这种情况下,从拓扑接收到的消息的顺序不是连续的(因为每个正在处理的请求都需要自己的时间,并且生产者 Bolt 会像 on当它收到一个)。

我有点坚持这种设计,无法继续前进。

//Controller
public DeferredResult<ResponseEntity<?>> process(//someinput) {
    DeferredResult<ResponseEntity<?>> result = new DeferredResult<>(config.getTimeout());
    CompletableFuture<String> serviceResponse = service.processAsync(inputSource);
    serviceResponse.whenComplete((response, exception) -> {
        if (!ObjectUtils.isEmpty(exception))
            result.setErrorResult(//error);
        else
            result.setResult(//complete);
    });
    return result;
}

//In Service
public CompletableFuture processAsync(//input){
    producer.send(input);
    CompletableFuture result = new CompletableFuture();
    //consumer has a listener as shown below
    // **I want to avoid below line, how can I redesign this**
    map.put(id, result);
  return result;
}

//in same service, a listener is present for consumer for reading the messages
consumerListener(Message msg){
     int reqID = msg.getRequestID();
     map.get(reqID).complete(msg.getData);
}

如上所示,一旦我收到消息,我就会得到 completableFuture 对象并设置结果,它内部调用延迟结果 对象并将响应返回给用户。

【问题讨论】:

  • 我不确定为什么您的问题针对的是 REST,因为您正在寻找一些内部解决方案。接下来,您可能会考虑将消息队列的调用封装到 Spring 引导应用程序中自己的服务中,该服务由 Spring cache 管理。这里 spring 缓存将使用 AOP 来缓存基于缓存键的方法调用(默认情况下调用的方法名称 + 方法参数),并在后续调用中使用此信息来决定是返回先前缓存的值还是发送新消息到队列和检索它的响应
  • 嗨@RomanVottner,感谢您的回复,但我添加了代码以便更好地理解。你能看看吗。由于我必须为用户提供相同的请求,因此我缓存了 DeferedResult 对象并在我从其他应用程序获取相应消息时调用它。
  • 你以前看过RMQ RPC(Remote Procedure Call)吗?它为每个请求创建一个临时响应队列,以便消费者可以顺序消费消息并异步产生响应消息。请参考Remote procedure call (RPC)

标签: rest spring-boot apache-storm mq


【解决方案1】:

在这种情况下,从拓扑接收到的消息的顺序不是连续的(因为每个正在处理的请求都需要自己的时间,并且生产者 Bolt 会像 on当它收到一个)。

听起来您正在寻找Correlation Identifier 消息传递模式。概括地说,您计算/创建一个标识符,该标识符附加到发送到 pulsar 的消息上,并安排 Heron 将该标识符从它收到的请求复制到它发送的响应中。

因此,当您的 Spring Boot 组件在第 5 步使用来自 pulsar 的消息时,您将关联 id 匹配到正确的 http 请求,并返回结果。

据我所知,使用原始 requestId() 作为关联标识符应该没问题。

为了服务于同一个请求,我现在在内存中缓存延迟结果对象(我为发送到拓扑的每条消息设置了一个 reqID,并且我还在内存中维护了一个键、值对),当消息到达时我解析请求 id 并将结果设置为 deferedResult(我知道这是一个糟糕的设计,应该如何解决这个问题?)。

最终,您可能会在某种程度上这样做;也就是说,第 5 步的消费者将使用相关 id 来查找生产者存储的 something。试图通过四个不同的进程边界传递原始请求可能会以泪水告终。

更一般的形式是在地图中存储一个回调,而不是CompletableFuture;但在这种情况下,回调可能只是完成未来。

我想在设计中仔细检查一件事:您要确保第 5 步的消费者在消息到达之前看到它应该使用的未来。换句话说,应该在某处有一个happens-before内存屏障,以确保第5步的映射查找不会失败。

【讨论】:

    猜你喜欢
    • 2023-03-24
    • 1970-01-01
    • 2015-12-16
    • 2019-11-05
    • 2023-02-10
    • 1970-01-01
    • 2014-02-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多