【问题标题】:Redisson async procesing messagesRedisson 异步处理消息
【发布时间】:2021-03-13 09:47:24
【问题描述】:

我正在尝试为我的项目应用 Redisson 功能作为消息代理,但我有一个问题。是否可以将 Redisson 推送到异步接收到的消息?我创建了一个小例子,从不同的 URL 发送了 4 条消息。我预计,Redisson 会异步处理它们,但它会一一进行。 这里是实现:

public class RedisListenerServiceImpl implements MessageListener<String> {

    private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void onMessage(CharSequence channel, String stringMsg) {

        log.info("Message received: {}", stringMsg);
        MessageDto msg;
        try {
            msg = objectMapper.readValue(stringMsg, MessageDto.class);
        } catch (final IOException e) {
            log.error("Unable to deserialize message: {}", e.getMessage(), e);
            return;
        }

        try {
            //Do my stuff
        } catch (Exception e) {
            log.error("Unable to get service from factory: {}", e.getMessage(), e);
        }

    }
}

还有配置:

@Configuration
public class RedisListenerConfig {

    @Autowired
    public RedisListenerConfig(RedissonClient redisClient,
                               MessageListener redisListenerService,
                               @Value("${redis.sub.key}") String redisSubKey) {

        RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
        subscribeTopic.addListenerAsync(String.class, redisListenerService);
    }
}

【问题讨论】:

标签: java multithreading asynchronous redis redisson


【解决方案1】:

这是预期的行为。如果您希望在触发侦听器 onMessage() 方法时同时处理您的消息,只需使用线程池进行处理即可。

由于 Redisson 不知道您要使用多少个线程来使用触发的事件,因此它将实现细节留给您。

public class RedisListenerServiceImpl implements MessageListener<String> {

private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);

@Override
public void onMessage(CharSequence channel, String stringMsg) {

    log.info("Message received: {}", stringMsg);
    MessageDto msg;
    try {
        msg = objectMapper.readValue(stringMsg, MessageDto.class);
        executorService.submit(()->{
        System.out.println("do something with message: "+msg);
    });
    } catch (final IOException e) {
        log.error("Unable to deserialize message: {}", e.getMessage(), e);
        return;
    }

    try {
        //Do my stuff
    } catch (Exception e) {
        log.error("Unable to get service from factory: {}", e.getMessage(), e);
    }

}

【讨论】:

  • 感谢您的解释。但是如果异步不意味着同时处理消息,那是什么意思呢?
  • @ДанилаКононихин 我认为这里的“异步”意味着非阻塞。有一个后台侦听器线程为您监控事件。这并不意味着有多个监听线程同时处理事件。
猜你喜欢
  • 2017-02-12
  • 1970-01-01
  • 2016-08-19
  • 1970-01-01
  • 2011-09-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多