【问题标题】:Send message only to certain client using websockets with Rsocket and Spring Webflux使用带有 Rsocket 和 Spring Webflux 的 websocket 仅向特定客户端发送消息
【发布时间】:2020-07-17 13:57:24
【问题描述】:

我正在尝试在我的一个 POC 项目中将 Rsocket 与 websocket 一起使用。在我的情况下,不需要用户登录。当我收到来自其他服务的消息时,我只想向某些客户端发送消息。基本上,我的流程是这样的。

                                  Service A                               Service B   
|--------|    websocket     |------------------|   Queue based comm   |---------------| 
|  Web   |----------------->| Rsocket server   |--------------------->| Another       | 
|        |<-----------------| using Websocket  |<---------------------| service       |
|--------|    websocket     |------------------|   Queue based comm   |---------------|

就我而言,我正在考虑为每个连接和每个请求使用一个唯一的 ID。将两个标识符合并为相关 id 并将消息发送到 Service B,当我从 Service B 收到消息时,确定它需要去哪个客户端并发送它。现在我知道我可能不需要 2 项服务来执行此操作,但我这样做是出于其他一些原因。虽然我对如何实现其他部分有一个粗略的想法。我是 Rsocket 概念的新手。是否可以使用 Spring Boot Webflux、Rsocket 和 websocket 通过某个 id 向唯一的某个客户端发送消息?

【问题讨论】:

    标签: websocket reactive-programming spring-webflux rsocket


    【解决方案1】:

    基本上,我认为您有两个选择。第一个是过滤来自Service B 的通量,第二个是使用@NikolaB 描述的RSocketRequesterMap

    第一个选项:

    data class News(val category: String, val news: String)
    data class PrivateNews(val destination: String, val news: News)
    
    class NewsProvider {
    
        private val duration: Long = 250
    
        private val externalNewsProcessor = DirectProcessor.create<News>().serialize()
        private val sink = externalNewsProcessor.sink()
    
        fun allNews(): Flux<News> {
            return Flux
                    .merge(
                            carNews(), bikeNews(), cosmeticsNews(),
                            externalNewsProcessor)
                    .delayElements(Duration.ofMillis(duration))
        }
    
        fun externalNews(): Flux<News> {
            return externalNewsProcessor;
        }
    
        fun addExternalNews(news: News) {
            sink.next(news);
        }
    
        fun carNews(): Flux<News> {
            return Flux
                    .just("new lambo!!", "amazing ferrari!", "great porsche", "very cool audi RS4 Avant", "Tesla i smarter than you")
                    .map { News("CAR", it) }
                    .delayElements(Duration.ofMillis(duration))
                    .log()
        }
    
        fun bikeNews(): Flux<News> {
            return Flux
                    .just("specialized enduro still the biggest dream", "giant anthem fast as hell", "gravel long distance test")
                    .map { News("BIKE", it) }
                    .delayElements(Duration.ofMillis(duration))
                    .log()
        }
    
        fun cosmeticsNews(): Flux<News> {
            return Flux
                    .just("nivea - no one wants to hear about that", "rexona anti-odor test")
                    .map { News("COSMETICS", it) }
                    .delayElements(Duration.ofMillis(duration))
                    .log()
        }
    
    }
    
    @RestController
    @RequestMapping("/sse")
    @CrossOrigin("*")
    class NewsRestController() {
        private val log = LoggerFactory.getLogger(NewsRestController::class.java)
    
        val newsProvider = NewsProvider()
    
        @GetMapping(value = ["/news/{category}"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
        fun allNewsByCategory(@PathVariable category: String): Flux<News> {
            log.info("hello, getting all news by category: {}!", category)
            return newsProvider
                    .allNews()
                    .filter { it.category == category }
        }
    }
    
    

    NewsProvider 类是您的Service B 的模拟,它应该返回Flux&lt;&gt;。每当您调用addExternalNews 时,它都会推送allNews 方法返回的News。在NewsRestController 类中,我们按类别过滤新闻。在localhost:8080/sse/news/CAR 上打开浏览器,仅查看汽车新闻。

    如果你想改用 RSocket,你可以使用这样的方法:

        @MessageMapping("news.{category}")
        fun allNewsByCategory(@DestinationVariable category: String): Flux<News> {
            log.info("RSocket, getting all news by category: {}!", category)
            return newsProvider
                    .allNews()
                    .filter { it.category == category }
        }
    

    第二个选项:

    让我们将RSocketRequester@ConnectMapping 一起存储在HashMap(我使用vavr.io)中。

    @Controller
    class RSocketConnectionController {
    
        private val log = LoggerFactory.getLogger(RSocketConnectionController::class.java)
    
        private var requesterMap: Map<String, RSocketRequester> = HashMap.empty()
    
        @Synchronized
        private fun getRequesterMap(): Map<String, RSocketRequester> {
            return requesterMap
        }
    
        @Synchronized
        private fun addRequester(rSocketRequester: RSocketRequester, clientId: String) {
            log.info("adding requester {}", clientId)
            requesterMap = requesterMap.put(clientId, rSocketRequester)
        }
    
        @Synchronized
        private fun removeRequester(clientId: String) {
            log.info("removing requester {}", clientId)
            requesterMap = requesterMap.remove(clientId)
        }
    
        @ConnectMapping("client-id")
        fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
            val clientIdFixed = clientId.replace("\"", "")  //check serialezer why the add " to strings
    //        rSocketRequester.rsocket().dispose()   //to reject connection
            rSocketRequester
                    .rsocket()
                    .onClose()
                    .subscribe(null, null, {
                        log.info("{} just disconnected", clientIdFixed)
                        removeRequester(clientIdFixed)
                    })
            addRequester(rSocketRequester, clientIdFixed)
        }
    
        @MessageMapping("private.news")
        fun privateNews(news: PrivateNews, rSocketRequesterParam: RSocketRequester) {
            getRequesterMap()
                    .filterKeys { key -> checkDestination(news, key) }
                    .values()
                    .forEach { requester -> sendMessage(requester, news) }
        }
    
        private fun sendMessage(requester: RSocketRequester, news: PrivateNews) {
            requester
                    .route("news.${news.news.category}")
                    .data(news.news)
                    .send()
                    .subscribe()
        }
    
        private fun checkDestination(news: PrivateNews, key: String): Boolean {
            val list = destinations(news)
            return list.contains(key)
        }
    
        private fun destinations(news: PrivateNews): List<String> {
            return news.destination
                    .split(",")
                    .map { it.trim() }
        }
    }
    

    请注意,我们必须在 rsocket-js 客户端中添加两件事:在 SETUP 帧中的有效负载,用于提供客户端 ID 并注册响应程序,以处理 RSocketRequester 发送的消息。

    const client = new RSocketClient({
    // send/receive JSON objects instead of strings/buffers
    serializers: {
      data: JsonSerializer,
      metadata: IdentitySerializer
    },
    setup: {
      //for connection mapping on server
      payload: {
        data: "provide-unique-client-id-here",
        metadata: String.fromCharCode("client-id".length) + "client-id"
      },
      // ms btw sending keepalive to server
      keepAlive: 60000,
    
      // ms timeout if no keepalive response
      lifetime: 180000,
    
      // format of `data`
      dataMimeType: "application/json",
    
      // format of `metadata`
      metadataMimeType: "message/x.rsocket.routing.v0"
    },
    responder: responder,
    transport
    });
    

    有关更多信息,请参阅此问题:How to handle message sent from server to client with RSocket?

    【讨论】:

    【解决方案2】:

    我还没有亲自将 RSocket 与 WebSocket 传输一起使用,但正如 RSocket 规范中所述,底层传输协议甚至不应该是重要的。

    一个 RSocket 组件同时是服务器和客户端。因此,当浏览器连接到您的 RSocket“服务器”时,您可以注入 RSocketRequester 实例,然后您可以使用该实例向“客户端”发送消息。

    然后您可以将这些实例添加到您的本地缓存中(例如,使用您选择的键将它们放入一些全局可用的ConcurrentHashMap - 您可以从中知道/能够计算哪些客户端应该将来自服务的消息B 被传播)。

    然后在您从服务 B 接收消息的代码中,只需从本地缓存中获取符合您的条件的所有 RSocketRequester 实例并将消息发送给它们。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-26
      • 2020-05-06
      • 2019-06-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多