【问题标题】:Spring Boot RSocket send a message within a Message MappingSpring Boot RSocket 在消息映射中发送消息
【发布时间】:2020-12-02 06:50:52
【问题描述】:

盯着benwilcock/spring-rsocket-demo 的教程代码,我正在尝试编写一个服务器,在响应客户端之前将消息复制到第二个服务器。

为了尝试调试我的问题,我只是尝试在服务器之间进行一次微不足道的乒乓交换。只有当第二台服务器响应 pong 消息时,第一台服务器才会回复客户端:

@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request) {
    // register a mono that will be completed when replication to another server has happened
    String uuid = UUID.randomUUID().toString();
    Mono<Message> deferred = Mono.create(sink -> replicationNexus.registerRequest(uuid, sink));

    // FIXME attempt to send a nested request-response message that will complete the outer message later
    this.requesterMono.flatMap(requester -> requester.route("pong")
            .data(uuid)
            .retrieveMono(String.class))
            .subscribeOn(Schedulers.elastic())
            .subscribe( uuid2 -> replicationNexus.complete(uuid2, new Message(SERVER, RESPONSE)));

    // return the deferred work that will be completed by the pong response
    return deferred;
}

该逻辑正在尝试使用此 answer 创建与将重新连接的第二台服务器的连接:

    this.requesterMono = builder.rsocketConnector(connector -> connector
            .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", otherPort).cache();

为了完成这里的图片是琐碎的乒乓逻辑:

@MessageMapping("pong")
public Mono<String> pong(String m) {
    return Mono.just(m);
}

这里是保持外部客户端响应状态的逻辑,当其他服务器响应时完成:

public class ReplicationNexus<T> {
final Map<String, MonoSink<T>> requests = new ConcurrentHashMap<>();

public void registerRequest(String v, MonoSink<T> sink) {
    requests.put(v, sink);
}

public boolean complete(String uuid, T message) {
    Optional<MonoSink<T>> sink = Optional.of(requests.get(uuid));
    if( sink.isPresent() ){
        sink.get().success(message);
    }
    return sink.isPresent();
}
}

调试第二台服务器它从不运行 pong 方法。看来第一台服务器实际上并没有发送内部请求消息。

运行内部请求-响应交换以通过自动重新连接逻辑完成外部消息交换的正确方法是什么?

【问题讨论】:

标签: spring-boot rsocket


【解决方案1】:

不确定我是否遗漏了您问题的一些复杂性,但如果中间服务器只是像代理一样激活,我将从最简单的链接调用情况开始。我觉得我错过了这个问题的一些细微差别,所以让我们接下来解决这个问题。

  @MessageMapping("runCommand")
  suspend fun runCommandX(
    request: CommandRequest,
  ): Mono<String> {
    val uuid = UUID.randomUUID().toString()

    return requesterMono
      .flatMap { requester: RSocketRequester ->
        requester.route("pong")
          .data("TEST")
          .retrieveMono(String::class.java)
      }
      .doOnSubscribe {
        // register request with uuid
      }
      .doOnSuccess {
        // register completion
      }
      .doOnError {
        // register failure
      }
  }

通常,如果您可以避免在典型的 spring/reactive/rsocket 代码中调用 subscribe 自己。您希望框架为您执行此操作。

【讨论】:

  • 谢谢 yuri 我会尝试下一个。请问“!!”是什么意思像我不熟悉 kotlin 一样吗?
  • 强制进行非空检查 - 抛出 NPE 或继续。否则你需要 a?.b?.c?.d
  • 抱歉延迟回来。如果我在这里尝试你的方法github.com/simbo1905/spring-rsocket-demo/blob/… 如果在 flatMap 和 doOnSubscribe 中设置断点,那么它们不会被调用。似乎通过单声道平面图是懒惰的并且没有被调用?
  • 啊,是的,我没有让它急于求成。老实说,我几乎总是认为在订阅之前有副作用的 Mono 可能是一个错误。它打破了 Rx 语义恕我直言。
  • 好的,我有一个非常糟糕的解决方案,涉及卸载到应用程序线程来完成复制工作。我会将其作为路标发布给其他人,然后按照您的建议在一些专门的论坛上进行跟进。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-04-05
  • 1970-01-01
  • 2018-09-05
  • 2020-11-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多