【问题标题】:Spring JMSListener Merging the messagesSpring JMSListener 合并消息
【发布时间】:2017-01-19 21:49:02
【问题描述】:

使用并发“5”线程的侦听器实现 Spring JMS,

<jms:listener-container container-type="default" concurrency="5-10" connection-factory="cachingConnectionFactory"   >
    <jms:listener destination="TEST.FOO" ref="messageListener" />
 </jms:listener-container>

当我删除 5 条消息时,有 5 个线程正在侦听,我可以阅读这些消息。

我的问题是如何合并所有 5 条消息,是否可以编写一些构建器,构建器可以等待某个时间,所以当在那段时间内收到任何消息时,我可以合并所有消息?

代码:

    long startTime = 0;
                if (messageCount == 0) {
                    startTime = System.currentTimeMillis();
                }    
messageCount ++;
            if (messageCount < batchSize && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))) { // Or if some batch timelimit say

                        line += stringMessage;
                        reached = true; // this is volatile variable ,messageCount also volatile variable
                    }

                    System.out.println(line);

                    try {
                        if (reached) {
                            messageCount = 0;
                            line = "";
                            execService.createExecFile(line);

                        }
                    } catch (final Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

问候, Rj

【问题讨论】:

  • 你能详细解释一下合并所有5条消息是什么意思吗
  • 将字符串消息从每个线程附加到新行

标签: spring-jms


【解决方案1】:

您的messageListener 是一个单例,因此所有线程都将使用同一个实例,像这样,您可以同步对方法的调用,以将消息作为该实例上的字段附加到新行。

更新 1

使用org.springframework.jms.core.JmsTemplate.receive()

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQTextMessage;

public class DefaultMessageListener implements MessageListener {
    private volatile String line = "";
    private volatile int messageCount;
    private int batchSize;
    private volatile long startTime = 0;
    private long maximumBatchWaitTime;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                }
                if (messageCount > 0 && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))) {
                    createExecFile();
                }
            }
        }
    });

    {
        thread.start();
    }

    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
        }
        try {
            messageCount++;
            line += ((TextMessage) message).getText();
            System.out.println(line);
            // (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))
            // why to do this ?? not needed i think
            if (messageCount == batchSize) {
                createExecFile();
            }
        } catch (final Exception e) {
            e.printStackTrace();
        }
    }

    private void createExecFile() {
        try {
            execService.createExecFile(line);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        messageCount = 0;
        line = "";
    }
}

更新 2

import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.core.JmsTemplate;

public class DefaultMessageListener {
    private volatile String line = "";
    private volatile int messageCount;
    private long maximumBatchWaitTime;
    JmsTemplate jmsTemplate;

    public void getMessages() {
        try {
            // configure bean jmsTemplate like this on definition
            jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
            jmsTemplate.setReceiveTimeout(maximumBatchWaitTime);
            //
            Message message = jmsTemplate.receive();
            if (message != null) {
                messageCount++;
                line += ((TextMessage) message).getText();
            }
            System.out.println(line);
            if (messageCount > 0) {
                createExecFile();
            }
            message.acknowledge();
        } catch (final Exception e) {
            e.printStackTrace();
        }
    }

    private void createExecFile() {
        try {
            execService.createExecFile(line);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        messageCount = 0;
        line = "";
    }
}

INDIVIDUAL_ACKNOWLEDGE :当消费者再次连接时,未确认的消息将被重新传递。 例子http://alvinalexander.com/java/jwarehouse/activemq/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java.shtml

更新 3

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.core.JmsTemplate;

public class DefaultMessageListener {
    private volatile String line = "";
    private volatile int messageCount;
    private long maximumBatchWaitTime;
    JmsTemplate jmsTemplate;
    private ActiveMQSession session;

    public void getMessages() throws JMSException {
        try {
            // configure bean jmsTemplate like this on definition
            jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.CLIENT_ACKNOWLEDGE);
            jmsTemplate.setReceiveTimeout(maximumBatchWaitTime);
            //
            Message message = jmsTemplate.receive();
            if (message != null) {
                messageCount++;
                line += ((TextMessage) message).getText();
            }
            System.out.println(line);
            if (messageCount > 0) {
                createExecFile();
            }
        } catch (final Exception e) {
            e.printStackTrace();
            session.recover();
            messageCount = 0;
            line = "";
        }
    }

    private void createExecFile() {
        execService.createExecFile(line);
        session.acknowledge();
        messageCount = 0;
        line = "";
    }
}

【讨论】:

  • 评论不用于扩展讨论;这个对话是moved to chat
  • @Hassen Bennour,我还需要列出从队列接收到的消息,您能否建议如果您在“更新 1”下提到的答案会更好。 stackoverflow.com/questions/55107244/…
  • @AlagammalP 你可以测试它是的,或者使用 BatchMessageListenerContainer。
  • @Hassen Bennour,我会试试 "UPDATE 1" 下的那个。我无法获得 BatchMessageListenerContainer 的好例子来了解它是如何消耗多条消息的。
  • @AlagammalP 看看这里stackoverflow.com/a/28973075/6506229
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-02
  • 2018-12-12
  • 2016-07-02
  • 2017-07-29
  • 1970-01-01
  • 2021-02-09
相关资源
最近更新 更多