【发布时间】:2020-12-02 06:50:52
【问题描述】:
盯着benwilcock/spring-rsocket-demo 的教程代码,我正在尝试编写一个服务器,在响应客户端之前将消息复制到第二个服务器。
为了尝试调试我的问题,我只是尝试在服务器之间进行一次微不足道的乒乓交换。只有当第二台服务器响应 pong 消息时,第一台服务器才会回复客户端:
@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request) {
// register a mono that will be completed when replication to another server has happened
String uuid = UUID.randomUUID().toString();
Mono<Message> deferred = Mono.create(sink -> replicationNexus.registerRequest(uuid, sink));
// FIXME attempt to send a nested request-response message that will complete the outer message later
this.requesterMono.flatMap(requester -> requester.route("pong")
.data(uuid)
.retrieveMono(String.class))
.subscribeOn(Schedulers.elastic())
.subscribe( uuid2 -> replicationNexus.complete(uuid2, new Message(SERVER, RESPONSE)));
// return the deferred work that will be completed by the pong response
return deferred;
}
该逻辑正在尝试使用此 answer 创建与将重新连接的第二台服务器的连接:
this.requesterMono = builder.rsocketConnector(connector -> connector
.reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
.connectTcp("localhost", otherPort).cache();
为了完成这里的图片是琐碎的乒乓逻辑:
@MessageMapping("pong")
public Mono<String> pong(String m) {
return Mono.just(m);
}
这里是保持外部客户端响应状态的逻辑,当其他服务器响应时完成:
public class ReplicationNexus<T> {
final Map<String, MonoSink<T>> requests = new ConcurrentHashMap<>();
public void registerRequest(String v, MonoSink<T> sink) {
requests.put(v, sink);
}
public boolean complete(String uuid, T message) {
Optional<MonoSink<T>> sink = Optional.of(requests.get(uuid));
if( sink.isPresent() ){
sink.get().success(message);
}
return sink.isPresent();
}
}
调试第二台服务器它从不运行 pong 方法。看来第一台服务器实际上并没有发送内部请求消息。
运行内部请求-响应交换以通过自动重新连接逻辑完成外部消息交换的正确方法是什么?
【问题讨论】:
-
您的问题可能足够复杂,直接在community.reactive.foundation 中提问可能会更快得到答案。
标签: spring-boot rsocket