【问题标题】:Published messages in a reactive Redis topic aren't sent to the client反应式 Redis 主题中发布的消息不会发送到客户端
【发布时间】:2020-09-09 11:40:26
【问题描述】:

在实现 GraphQL 订阅的过程中,我想出了一个穷人的 pub/sub 解决方案,其自定义主题如下所示:

// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
      new ConcurrentMultiMap<>();

public Publisher<String> subscribeToMessages(final Long id) {
  return Flux.create(
      newSubscriber ->
          subscribers.add(
              id,
              newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
      OverflowStrategy.LATEST);
}

public void publish(final Long id, final String message) {
  Optional.of(id)
      .map(subscribers::get)
      .ifPresent(
          subscribers ->
              subscribers.forEach(
                  subscriber -> subscriber.next(message)));
}

继续前进,当我偶然发现返回 FluxRTopicReactive::getMessages 方法时,我想用一个使用 RedissonReactiveClient 的 Redis 支持的解决方案替换这个实现。基本上之前的代码现在看起来如下:

public Publisher<String> subscribeToMessages(final Long id) {
  val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
  return topic.getMessages(String.class);
}

public void publish(final Long id, final String message) {
  val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
  topic.publish(message);
}

不幸的是,这并没有按预期工作。据我所知,更新已正确发布(因为我连接到 redis 实例并 SUBSCRIBEd 连接到它一次以检查数据),甚至在让我的 GraphQL 客户端订阅服务器之后 Mono 返回由 @ 987654328@ 表示有一个订阅者,我的 GraphQL 客户端。但是,它没有更新其数据,并且浏览器中的网络选项卡没有显示来自服务器的已发布数据的 websocket 消息。因此我假设subscribeToMessages方法中通过Flux暴露的数据格式不正确。

在尝试使用主题的getMessages 方法时有什么需要注意的吗?

【问题讨论】:

    标签: java redis publish-subscribe reactive redisson


    【解决方案1】:

    您忘记在两个发布者上调用订阅。应该是这样的:

    public void publish(final Long id, final String message) {  
       val topic = redissonReactiveClient.getTopic("messages/" + id.toString());  
       topic.publish(message).doOnSuccess(res -> {  
    
           // ...   
    
       }).subscribe();
    }
    
    public void subscribeToMessages(final Long id) {
       val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
       return topic.getMessages(String.class).doOnNext(res -> {  
    
           // ...   
    
       }).subscribe();
    }
    

    【讨论】:

      猜你喜欢
      • 2016-06-26
      • 2013-04-24
      • 2020-12-15
      • 1970-01-01
      • 2013-05-19
      • 2014-07-16
      • 1970-01-01
      • 2019-12-31
      • 1970-01-01
      相关资源
      最近更新 更多