【发布时间】:2020-01-14 17:00:27
【问题描述】:
我有两个 SQS 队列:一个用于低优先级消息,另一个用于高优先级消息。该逻辑意味着不要接触低优先级队列上的消息,除非高优先级队列为空。
SuspendLowPriorityRoutePolicy suspendLowPriorityRoutePolicy = new SuspendLowPriorityRoutePolicy(LOW_PRIORITY_ROUTE_ID, camelContext);
from(UriBuilder.buildSqsUri(sqsProperties)).routeId(LOW_PRIORITY_ROUTE_ID)
.log(LoggingLevel.INFO, log, "NON-PRIORITY: ${body}");
from(UriBuilder.buildPrioritySqsUri(sqsProperties)).routeId(HIGH_PRIORITY_ROUTE_ID)
.routePolicy(suspendLowPriorityRoutePolicy)
.log(LoggingLevel.INFO, log, "PRIORITY: ${body}");
这两个消费者都将其concurrentConsumers 属性设置为 1,这意味着它们一次将处理一条消息。
现在,我设置了这两个路由来同时使用队列中的消息。我想要的是进入高优先级路由的消息触发低优先级路由的停止。为了尝试获得此功能,我尝试使用一个路由策略,当在高优先级路由上启动新交换时停止低优先级队列:
(来自SuspendLowPriorityRoutePolicy的片段)
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
Route lowPriorityRoute = context.getRoute(lowPriorityRouteId);
ServiceStatus routeStatus = context.getRouteStatus(lowPriorityRouteId);
if (!routeStatus.isStopped()) {
try {
lock.lock();
log.info("High priority request came in, stopping consumer");
stopConsumer(lowPriorityRoute.getConsumer());
} catch (Exception e) {
log.error("Exception stopping consumer " + e);
handleException(e);
} finally {
lock.unlock();
}
}
}
但是,我不确定如何重新启动低优先级的消费者。 Camel 提供的其他钩子RoutePolicy 允许覆盖onExchangeDone,但此时逻辑应该是仅在高优先级队列为空时重新启动低优先级消费者。我认为没有办法检查队列是否为空,我们可以检查交换完成挂钩上的ApproximateNumberOfMessages 属性,但这可能不准确。
另一个想法是有一个预定的后台轮询器,它检查CamelContext 的 inFlightRequestsRepository 并仅在高优先级路由没有飞行请求时重新启动低优先级队列。
【问题讨论】: