【问题标题】:SockJS connections in a clustered Vert.x environment集群 Vert.x 环境中的 SockJS 连接
【发布时间】:2021-06-29 07:31:19
【问题描述】:

vertx 应用程序在 Docker 容器中运行,在两个 EC2 实例上并且是集群的。

使用hazelcast-aws 插件实现集群,应用程序是这样启动的:

docker run --name ... -p ... \
--network ... \
-v ... \
-d ... \
-c 'exec java \
-Dvertx.eventBus.options.setClustered=true \
-Dvertx.eventBus.options.setClusterPort=15701 \
-jar ... -conf ... \
-cluster'

没有以编程方式设置任何与集群相关的内容。

客户端在第一个请求时打开一个套接字,并将其用于以后的类似请求。
每个请求将:

  1. 通过向事件总线发布消息来向服务器发起异步请求
  2. 在事件总线上注册一个消费者,它将处理上述结果, 并传递了对套接字连接的引用,它应该将结果发送到

由于 vertx 在集群和有两个实例时默认执行轮询,这意味着任何实例都会收到所有其他消息(来自上面的 1.),并使仅连接到一个实例的客户端接收到所有消息的一半预期的反应。

我想这是因为,即使注册的消费者有对套接字对象的引用,它也不能使用它,因为它是在不同的节点/网络服务器上创建的。

这是正确的吗?有没有办法将 100% 的消息发送到客户端,只连接到一个节点,而不引入 RabbitMQ 之类的东西?

这是 SockJS 处理程序代码:

SockJSHandler sockJSHandler = SockJSHandler.create(vertx, new SockJSHandlerOptions());
sockJSHandler.socketHandler(socket -> {
    SecurityService securityService = (SecurityService) ServiceFactory.getService(SecurityService.class);
    if (securityService.socketHeadersSecurity(socket)) {
        socket.handler(socketMessage -> {
            try {
                LOGGER.trace("socketMessage: " + socketMessage);
                Socket socket = Json.decodeValue(socketMessage.toString(), Socket.class);
                Report report = socket.getReport();
                if (report != null) {
                    Account accountRequest = socket.getAccount();
                    Account accountDatabase = accountRequest == null ? null
                            : ((AccountService) ServiceFactory.getService(AccountService.class)).getById(accountRequest.getId());
                    Response result = securityService.socketReportSecurity(accountRequest, accountDatabase, report) ?
                            ((ReportService) ServiceFactory.getService(ReportService.class)).createOrUpdateReport(report, accountDatabase)
                            : new Response(Response.unauthorized);
                    if (Response.success.equals(result.getResponse())) {
                        //register a consumer
                        String consumerName = "report.result." + Timestamp.from(ClockFactory.getClock().instant());
                        vertx.eventBus().consumer(consumerName, message -> {
                            Response executionResult;
                            if ("success".equals(message.body())) {
                                try {
                                    Path csvFile = Paths.get(config.getString(Config.reportPath.getConfigName(), Config.reportPath.getDefaultValue())
                                            + "/" + ((Report) result.getPayload()).getId() + ".csv");
                                    executionResult = new Response(new JsonObject().put("csv", new String(Files.readAllBytes(csvFile))));
                                } catch (IOException ioEx) {
                                    executionResult = new Response(new Validator("Failed to read file.", ioEx.getMessage(), null, null));
                                    LOGGER.error("Failed to read file.", ioEx);
                                }
                            } else {
                                executionResult = new Response(new Validator("Report execution failed", (String)message.body(), null, null));
                            }
                            //send second message to client
                            socket.write(Json.encode(executionResult));
                            vertx.eventBus().consumer(consumerName).unregister();
                        });
                        //order report execution
                        vertx.eventBus().send("report.request", new JsonObject()
                                .put("reportId", ((Report) result.getPayload()).getId())
                                .put("consumerName", consumerName));
                    }
                    //send first message to client
                    socket.write(Json.encode(result));
                } else {
                    LOGGER.info("Insufficient data sent over socket: " + socketMessage.toString());
                    socket.end();
                }
            } catch (DecodeException dEx) {
                LOGGER.error("Error decoding message.", dEx);
                socket.end();
            }
        });
    } else {
        LOGGER.info("Illegal socket connection attempt from: " + socket.remoteAddress());
        socket.end();
    }
});
mainRouter.route("/websocket/*").handler(sockJSHandler);

有趣的是,当在 localhost 上运行两个集群节点时,客户端可以获得 100% 的结果。

编辑: 这不是 SockJS,而是配置问题。

【问题讨论】:

    标签: vert.x sockjs


    【解决方案1】:

    因为 vertx 在集群时默认会循环,并且有 两个实例,这意味着任何实例都会收到所有其他消息(来自 1.,如上),并使仅连接到一个实例的客户端收到所有预期响应的一半。

    这个假设只是部分正确。 Vert.x 进行循环,是的,但这意味着每个实例将获得一半的连接,而不是一半的消息。

    一旦建立连接,它的所有消息都会到达一个实例。

    所以这个:

    这是正确的吗?有没有办法让 100% 的消息发送到 客户端,只连接一个节点,不引入任何东西 喜欢 RabbitMQ?

    已经发生了。

    【讨论】:

    • 感谢您的回答,阿列克谢。原来,我有一个配置问题,所以我的问题不是一个好的线索,这个问题与 SockJS 无关。我会删除它,但人们可能会发现您的评论很有用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-05
    • 1970-01-01
    • 1970-01-01
    • 2010-11-14
    • 1970-01-01
    相关资源
    最近更新 更多