【发布时间】:2021-06-29 07:31:19
【问题描述】:
vertx 应用程序在 Docker 容器中运行,在两个 EC2 实例上并且是集群的。
使用hazelcast-aws 插件实现集群,应用程序是这样启动的:
docker run --name ... -p ... \
--network ... \
-v ... \
-d ... \
-c 'exec java \
-Dvertx.eventBus.options.setClustered=true \
-Dvertx.eventBus.options.setClusterPort=15701 \
-jar ... -conf ... \
-cluster'
没有以编程方式设置任何与集群相关的内容。
客户端在第一个请求时打开一个套接字,并将其用于以后的类似请求。
每个请求将:
- 通过向事件总线发布消息来向服务器发起异步请求
- 在事件总线上注册一个消费者,它将处理上述结果, 并传递了对套接字连接的引用,它应该将结果发送到
由于 vertx 在集群和有两个实例时默认执行轮询,这意味着任何实例都会收到所有其他消息(来自上面的 1.),并使仅连接到一个实例的客户端接收到所有消息的一半预期的反应。
我想这是因为,即使注册的消费者有对套接字对象的引用,它也不能使用它,因为它是在不同的节点/网络服务器上创建的。
这是正确的吗?有没有办法将 100% 的消息发送到客户端,只连接到一个节点,而不引入 RabbitMQ 之类的东西?
这是 SockJS 处理程序代码:
SockJSHandler sockJSHandler = SockJSHandler.create(vertx, new SockJSHandlerOptions());
sockJSHandler.socketHandler(socket -> {
SecurityService securityService = (SecurityService) ServiceFactory.getService(SecurityService.class);
if (securityService.socketHeadersSecurity(socket)) {
socket.handler(socketMessage -> {
try {
LOGGER.trace("socketMessage: " + socketMessage);
Socket socket = Json.decodeValue(socketMessage.toString(), Socket.class);
Report report = socket.getReport();
if (report != null) {
Account accountRequest = socket.getAccount();
Account accountDatabase = accountRequest == null ? null
: ((AccountService) ServiceFactory.getService(AccountService.class)).getById(accountRequest.getId());
Response result = securityService.socketReportSecurity(accountRequest, accountDatabase, report) ?
((ReportService) ServiceFactory.getService(ReportService.class)).createOrUpdateReport(report, accountDatabase)
: new Response(Response.unauthorized);
if (Response.success.equals(result.getResponse())) {
//register a consumer
String consumerName = "report.result." + Timestamp.from(ClockFactory.getClock().instant());
vertx.eventBus().consumer(consumerName, message -> {
Response executionResult;
if ("success".equals(message.body())) {
try {
Path csvFile = Paths.get(config.getString(Config.reportPath.getConfigName(), Config.reportPath.getDefaultValue())
+ "/" + ((Report) result.getPayload()).getId() + ".csv");
executionResult = new Response(new JsonObject().put("csv", new String(Files.readAllBytes(csvFile))));
} catch (IOException ioEx) {
executionResult = new Response(new Validator("Failed to read file.", ioEx.getMessage(), null, null));
LOGGER.error("Failed to read file.", ioEx);
}
} else {
executionResult = new Response(new Validator("Report execution failed", (String)message.body(), null, null));
}
//send second message to client
socket.write(Json.encode(executionResult));
vertx.eventBus().consumer(consumerName).unregister();
});
//order report execution
vertx.eventBus().send("report.request", new JsonObject()
.put("reportId", ((Report) result.getPayload()).getId())
.put("consumerName", consumerName));
}
//send first message to client
socket.write(Json.encode(result));
} else {
LOGGER.info("Insufficient data sent over socket: " + socketMessage.toString());
socket.end();
}
} catch (DecodeException dEx) {
LOGGER.error("Error decoding message.", dEx);
socket.end();
}
});
} else {
LOGGER.info("Illegal socket connection attempt from: " + socket.remoteAddress());
socket.end();
}
});
mainRouter.route("/websocket/*").handler(sockJSHandler);
有趣的是,当在 localhost 上运行两个集群节点时,客户端可以获得 100% 的结果。
编辑: 这不是 SockJS,而是配置问题。
【问题讨论】: