【发布时间】:2018-10-14 23:17:13
【问题描述】:
在我们的架构中,每个应用程序都有 2 个或更多容器 高可用性目的。 我们正在使用 activeMQ,我想实现以下行为。
- 将消息推送到队列。
- 此消息将仅由 *one *container(第一个 会读的)。
- 一旦消息处理成功,消费者将更新此消息 消息可以被确认和忽略
- 我尝试使用事务提交并使用 CLIENT_ACKNOWLEDGE 在这两种情况下,两个消费者都收到了消息并进行了处理。
我希望只有一个消费者根据可用性处理每条消息。 我们的实现是用 Java 实现的。
请分享实现方式。
这是我的代码示例
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Destination consumerDestination = consumerSession.createQueue(queueName);
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// Begin to wait for messages.
Queue queue = consumerSession.createQueue(queueName);
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
//do your things here
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
if (message == null)
continue;
//handle message
System.out.println("Message received in : " + message);
try {
String text = message.getText();
JSONObject messageJson = new JSONObject(text);
consumer.receive(1000);
String responseString = handleMessage(messageJson);
message.acknowledge();
谢谢 摩西
【问题讨论】:
-
假设您实际上使用的是队列(而不是主题),并且还假设您正在适当地确认消息(并且没有将其取消回队列以便其他消费者可以使用)然后一切应该按预期工作。你能证实这两个假设吗?您可以在创建会话和确认消息的位置分享您的客户端代码吗?
-
谢谢@JustinBertram 我用我的代码更新了问题