【发布时间】:2020-09-09 11:40:26
【问题描述】:
在实现 GraphQL 订阅的过程中,我想出了一个穷人的 pub/sub 解决方案,其自定义主题如下所示:
// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
new ConcurrentMultiMap<>();
public Publisher<String> subscribeToMessages(final Long id) {
return Flux.create(
newSubscriber ->
subscribers.add(
id,
newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
OverflowStrategy.LATEST);
}
public void publish(final Long id, final String message) {
Optional.of(id)
.map(subscribers::get)
.ifPresent(
subscribers ->
subscribers.forEach(
subscriber -> subscriber.next(message)));
}
继续前进,当我偶然发现返回 Flux 的 RTopicReactive::getMessages 方法时,我想用一个使用 RedissonReactiveClient 的 Redis 支持的解决方案替换这个实现。基本上之前的代码现在看起来如下:
public Publisher<String> subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class);
}
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message);
}
不幸的是,这并没有按预期工作。据我所知,更新已正确发布(因为我连接到 redis 实例并 SUBSCRIBEd 连接到它一次以检查数据),甚至在让我的 GraphQL 客户端订阅服务器之后 Mono 返回由 @ 987654328@ 表示有一个订阅者,我的 GraphQL 客户端。但是,它没有更新其数据,并且浏览器中的网络选项卡没有显示来自服务器的已发布数据的 websocket 消息。因此我假设subscribeToMessages方法中通过Flux暴露的数据格式不正确。
在尝试使用主题的getMessages 方法时有什么需要注意的吗?
【问题讨论】:
标签: java redis publish-subscribe reactive redisson