【问题标题】:Elegant way to setup high-priority/low-priority route consumers of sqs with Camel RoutePolicy?使用 Camel RoutePolicy 设置 sqs 的高优先级/低优先级路由消费者的优雅方法?
【发布时间】:2020-01-14 17:00:27
【问题描述】:

我有两个 SQS 队列:一个用于低优先级消息,另一个用于高优先级消息。该逻辑意味着不要接触低优先级队列上的消息,除非高优先级队列为空。

SuspendLowPriorityRoutePolicy suspendLowPriorityRoutePolicy = new SuspendLowPriorityRoutePolicy(LOW_PRIORITY_ROUTE_ID, camelContext);

        from(UriBuilder.buildSqsUri(sqsProperties)).routeId(LOW_PRIORITY_ROUTE_ID)
                .log(LoggingLevel.INFO, log, "NON-PRIORITY: ${body}");

        from(UriBuilder.buildPrioritySqsUri(sqsProperties)).routeId(HIGH_PRIORITY_ROUTE_ID)
                .routePolicy(suspendLowPriorityRoutePolicy)
                .log(LoggingLevel.INFO, log, "PRIORITY: ${body}");

这两个消费者都将其concurrentConsumers 属性设置为 1,这意味着它们一次将处理一条消息。

现在,我设置了这两个路由来同时使用队列中的消息。我想要的是进入高优先级路由的消息触发低优先级路由的停止。为了尝试获得此功能,我尝试使用一个路由策略,当在高优先级路由上启动新交换时停止低优先级队列:

(来自SuspendLowPriorityRoutePolicy的片段)

 @Override
    public void onExchangeBegin(Route route, Exchange exchange) {
        Route lowPriorityRoute = context.getRoute(lowPriorityRouteId);
        ServiceStatus routeStatus = context.getRouteStatus(lowPriorityRouteId);
        if (!routeStatus.isStopped()) {
            try {
                lock.lock();
                log.info("High priority request came in, stopping consumer");
                stopConsumer(lowPriorityRoute.getConsumer());
            } catch (Exception e) {
                log.error("Exception stopping consumer " + e);
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

但是,我不确定如何重新启动低优先级的消费者。 Camel 提供的其他钩子RoutePolicy 允许覆盖onExchangeDone,但此时逻辑应该是仅在高优先级队列为空时重新启动低优先级消费者。我认为没有办法检查队列是否为空,我们可以检查交换完成挂钩上的ApproximateNumberOfMessages 属性,但这可能不准确。

另一个想法是有一个预定的后台轮询器,它检查CamelContext 的 inFlightRequestsRepository 并仅在高优先级路由没有飞行请求时重新启动低优先级队列。

【问题讨论】:

    标签: apache-camel amazon-sqs


    【解决方案1】:

    嗯,我能想到的最简单的方法是,使用比低优先级消息更多的消费者来使用高优先级消息。这样,高优先级的消息通常会很快被消耗掉,而低优先级的消息可能会堆积起来,因此必须等待。

    但是,低优先级消费者仍将处理消息,无论同时处理多少高优先级消息。


    更接近您的用例的是使用JMS priorities

    您可以以相同的方式使用两个队列,只需为每个队列的消息设置一个不同的 JMS 优先级。例如,低优先级消息的标准优先级为 4,高优先级消息的优先级为 8。

    然后您将两个消费者的消息转发到第三个队列。所以在这第三个队列中,高优先级和低优先级消息混合到达,但每个消息都设置了适当的优先级。

    然后您将消费者附加到第三个队列,并且由于消息优先级,应首先使用高优先级消息。只有当队列不包含高优先级消息时,才会使用低优先级消息。

    但是,有一些事情要记住:

    • 根据需要使用消息优先级重新排序队列中的消息。这会降低包含大量消息的队列的性能。
    • 可能必须在您的代理或队列上激活消息优先级。否则优先级被忽略。与您的经纪人一起搜索优先示例以进行检查。
    • 我不确定,但我认为JMS 连接上也有一些标志以尊重消费者的优先级
    • 您必须将消费者预取保持在较低水平。如果您的消费者预取 1000 条低优先级消息,它会在执行任何其他操作之前将它们全部处理。无论此时是否有高优先级消息到达队列。预取越大,在处理新到达的高优先级消息之前可以处理的低优先级消息就越多。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-10-27
      • 1970-01-01
      • 2015-02-11
      • 2023-04-02
      • 1970-01-01
      • 2021-05-14
      • 1970-01-01
      • 2020-12-21
      相关资源
      最近更新 更多