【问题标题】:Listening to a jms queue and processing only 10 messages at a time监听一个 jms 队列,一次只处理 10 条消息
【发布时间】:2015-12-10 23:51:27
【问题描述】:

我有一个 javax.jms.Queue 队列并让我的侦听器监听这个队列。我得到消息(一个字符串)并执行一个进程,将字符串作为输入参数传递给该进程。

我只想同时运行该进程的 10 个实例。一旦这些都完成了,那么只有下一条消息应该被处理。

如何实现?因为它一次读取所有消息并运行该进程的多个实例,导致服务器挂起。

// using javax.jms.MessageListener
message = consumer.receive(5000); 
if (message != null) { 
    try { 
        handler.onMessage(message);  //handler is MessageListener instance
    }
}

【问题讨论】:

  • 请显示您的侦听器在哪里提取消息并将它们放入进程的代码。
  • 我正在使用 javax.jms.MessageListener。消息 = 消费者.receive(5000); if (message != null) { try { handler.onMessage(message); } handler 是 MessageListener 实例。
  • 您的进程是在线程上执行,还是在另一个 JVM/native/其他进程上执行?
  • 它实际上是在另一个JVM上运行的。实际上我运行了一个可执行文件,它接受字符串作为我从 jms 队列中获取的参数。现在我希望一次只运行该 exe 的 10 个实例,一旦这 10 个完成工作,就应该处理下一条消息,依此类推。
  • 您是否知道外部进程何时终止或所有十个进程何时终止(成功/不成功),以及该消息是如何传递/处理的?这是否必须以十个为一组进行,或者您是否可以在另一个进程终止时立即启动一个新进程,而不是等待所有十个进程终止?

标签: java multithreading jms


【解决方案1】:

我认为一个简单的while 检查就足够了。这是一些伪代码。

While (running processes are less than 10) {
    add one to the running processes list
    do something with the message
}

onMessage的代码中:

function declaration of on Message(Parameters) {
    do something
    subtract 1 from the running processes list
}

确保您用于计算正在运行的进程数量的变量声明为volatile

根据要求提供示例:

public static volatile int numOfProcesses = 0;

while (true) {
    if (numOfProcesses < 10) {
        // read a message and make a new process, etc
        // probably put your receive code here
        numOfProcesses++;
    }
}

无论您的流程代码写在哪里:

// do stuff, do stuff, do more stuff
// finished stuff
numOfProcesses--;

【讨论】:

  • 我是否可以限制我的消费者一次只消费几条消息。现在它只是一次消耗所有的消息。你知道有没有可能
  • @user3262365 我以前从来没有遇到过这个问题,所以我不能确定这是可能的,但替代方法是限制您发送的消息,而不是限制在队列。如果您使用同步变量并一次发送 10 条消息,您应该能够实现您想要的。
  • 你能举个例子详细说明一下吗?
【解决方案2】:

我假设您有办法接受来自外部进程的hasTerminated 消息。此控制器线程将使用 Semaphore 与 JMS 侦听器通信。 Semaphore 用 10 个许可初始化,每次外部进程调用 TerminationController#terminate(或者外部进程与您的侦听器进程通信)时,它都会向 Semaphore 添加一个许可,然后 JMSListener 必须首先获取在它可以调用messageConsumer.release() 之前获得许可,以确保一次不能超过十个进程处于活动状态。

// created in parent class
private final Semaphore semaphore = new Semaphore(10);

@Controller
public class TerminationController {
    private final semaphore;

    public TerminationController(Semaphore semaphore) {
        this.semaphore = semaphore;
    }

    // Called from external processes when they terminate
    public void terminate() {
        semaphore.release();
    }
}

public class JMSListener implements Runnable {
    private final MessageConsumer messageConsumer;
    private final Semaphore semaphore;

    public JMSListener(MessageConsumer messageConsumer, Semaphore semaphore) {
        this.messageConsumer = messageConsumer;
        this.semaphore = semaphore;
    }

    public void run() {
        while(true) {
            semaphore.acquire();
            Message message = messageConsumer.receive();
            // create process from message
        }
    }
}

【讨论】:

  • Hi@Zim-Zam O'Pootertoot 你能举个例子详细说明一下吗?
【解决方案3】:

尝试将此注释放在您的 mdb 侦听器上:

@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10")

【讨论】:

  • 感谢您的快速回复。但是我没有任何 MDBListenere。我正在手动连接到队列。
  • 我正在使用 javax.jms.MessageListener,只要队列中有消息,我就会执行我的流程。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-09-29
  • 2016-04-24
  • 2013-09-12
  • 2013-12-16
  • 2019-07-01
  • 2017-08-04
  • 1970-01-01
相关资源
最近更新 更多