【问题标题】:WebSocket Slow - Java and JavaScriptWebSocket 慢 - Java 和 JavaScript
【发布时间】:2016-01-09 23:51:49
【问题描述】:

我正在处理我的世界插件的编码。 但是现在我有以下问题,我的websocket服务器响应非常非常慢。

这是我的 WebSocketClass(用于插件)

// 套接字服务器类

package me.mickerd.pcoc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;

import org.bukkit.Bukkit;
import org.java_websocket.WebSocket;
import org.java_websocket.WebSocketImpl;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

public class WebsocketServer extends WebSocketServer {
public static WebsocketServer s;

public WebsocketServer(int port) throws UnknownHostException {
    super(new InetSocketAddress(port));
}

public WebsocketServer(InetSocketAddress address) {
    super(address);
}

@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
    WebsocketSessionManager.getSessionManager().openSession(conn.getRemoteSocketAddress().getAddress().getHostAddress());
    Bukkit.getLogger().info(conn.getRemoteSocketAddress().getAddress().getHostName() + " has connected to the Websocket server!");
}

@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
    WebsocketSessionManager.getSessionManager().endSession(conn.getRemoteSocketAddress().getAddress().getHostAddress());
    Bukkit.getLogger().info(conn + " has disconnected form the Websocket server");
}

@Override
public void onMessage(WebSocket conn, String message) {
    Bukkit.getLogger().info("Recieve Websocket packet - " + conn + ":" + message);
    if (message.split(":")[0].equalsIgnoreCase("name")) {
        WebsocketSessionManager.getSessionManager().addSessionUsername(conn.getRemoteSocketAddress().getAddress().getHostAddress(), message.split(":")[1]);
    }
}

public static void runServer() throws InterruptedException, IOException {
    WebSocketImpl.DEBUG = true;
    int port = 8887;
    s = new WebsocketServer(port);
    s.start();
    Bukkit.getLogger().info("Websocket server started on port: " + s.getPort());
}

@Override
public void onError(WebSocket conn, Exception ex) {
    ex.printStackTrace();
    if (conn != null) {
        // some errors like port binding failed may not be assignable to a specific websocket
    }
}

public void sendToAll(String data) {
    Collection<WebSocket> con = connections();
    synchronized (con) {
        for (WebSocket c : con) {
            c.send(data);
        }
    }
}

public void sendData(WebsocketSession session, String data) {
    Collection<WebSocket> con = connections();
    synchronized (con) {
        for (WebSocket c : con) {
            if (c.getRemoteSocketAddress().getAddress().getHostAddress().equalsIgnoreCase(session.getHost())) {
                Bukkit.getLogger().info("Send data packet: " + data);
                c.send(data);
            }
        }
    }
}
}

这是我的 Javascript 接收器:

var sound = null;
var name = window.location
document.session.name.value = name

var text = document.session.name.value

var ws = new WebSocket("ws://" + window.location.hostname + ":8887/");

ws.onopen = function () {
        ws.send("name:" + delineate(text));
document.getElementById("title").innerHTML = "Welcome on the music server. Please hold this window open!";

};

ws.onmessage = function (evt) {
function loadScript(url, callback)
{
    // Adding the script tag to the head as suggested before
    var head = document.getElementsByTagName('head')[0];
    var script = document.createElement('script');
    script.type = 'text/javascript';
    script.src = url;

    // Then bind the event to the callback function.
    // There are several events for cross browser compatibility.
    script.onreadystatechange = callback;
    script.onload = callback;

    // Fire the loading
    head.appendChild(script);
}
if(evt.data == "stop"){
  sound.fadeOut(0, 3700);
} else {
sound = new Howl({
  urls: ['music/' + evt.data + '.ogg']
}).play();
console.log("playing music");
};
}

ws.onclose = function () {
    alert("Closed!");
};

ws.onerror = function (err) {
    alert("Error: " + err);
};

function delineate(str) {
    theleft = str.indexOf("=") + 1;
    theright = str.lastIndexOf("&");
    return (str.substring(theleft, theright));
}

反应很慢,但服务器上的其他东西快得令人难以置信!

有人可以帮忙吗?

【问题讨论】:

  • 你的网络连接速度是多少?
  • 用 Wireshark 看看哪部分通信速度慢并反馈。

标签: javascript java bukkit


【解决方案1】:

您正在使用的 websocket 库发送数据阻塞,这意味着对 c.send( 的调用将阻塞,直到帧被发送。

有不同的方法可以解决这个问题,例如:

为每条消息使用单独的异步线程:

public void sendToAll(String data) {
    // Ferrybig - added bukkit async task
    Bukkit.getSchedular().runTaskAsynchronously(plugin, new Runnable(){
    @Override public void run(){
        Collection<WebSocket> con = connections();
        synchronized (con) {
            for (WebSocket c : con) {
                c.send(data);
            }
        }
    // Ferrybig - added bukkit async task
    }});
}

虽然这可以快速解决您的问题,但这并不是一个干净的解决方案,因为大量发送消息意味着为了发送消息而创建了大量线程,即不要经常发送,或者查看下一个解决方案:

使用专用线程发送消息:

使用专用线程发送消息是更好的解决方案,但它的代码量很大。

对于这个解决方案,我们需要做以下事情:

  • 使用变量存储需要发送给每个客户端的消息

    private final BlockingQueue<Pair<WebSocket,String>> messageQueue
                  = new LinkedBlockingDeque<>();
    

    我们使用一个阻塞队列来保存包含 Web 套接字和要发送的消息的 Pair 对象。虽然我们也可以使用 Map 中的 Map.Entry 类,但我选择使用 Pair,因为我们稍后可以稍微更改代码以使其根据优先级自动处理消息。我用于此答案的 Pair 类可以在 What is the equivalent of the C++ Pair in Java? 找到。

  • 使用专用线程发送消息

    我们现在有传入消息的列表,我们现在处理传入的消息。这可以通过在messageQueue.take() 上阻止任务来完成。以下是对此的快速实现:

    public class MessageProcessor extends BukkitRunnable {
    
        private BlockingQueue<Pair<WebSocket,String>> messageQueue;
    
        public MessageProcessor (BlockingQueue<Pair<WebSocket,String>> messageQueue) {
            this.messageQueue = messageQueue;
        }
    
        @Override 
        public void run() {
            try {
                Pair<WebSocket,String> next;
                while(true) {
                    next = messageQueue.take();
                    if(next.getFirst() == null) {
                        // Special condition, will be explained later
                        return; // Quit run method
                    }
                    // System.out.println("Send message to " + next.getFirst()); // DEBUG
                    next.getFirst().send(next.getSecond());
                }
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt();
                // Someone wanted to quit our thread, so we are quiting
            } finally {
                messageQueue.clear();
            }
        }
    }  
    

    上面的类有两个特殊条件next.getFirst() == nullcatch(InterruptedException e),当我们禁用插件退出任务时会用到。

    • 启动 bukkit 时开始我们的专用任务

    我们需要在 bukkit 和我们的 Websocket 服务器启动时启动我们的任务,这样它才能开始处理消息和发送数据。这很容易在我们的onEnable() 中使用以下代码实现:

    new MessageProcessor (messageQueue).runTaskAsynchronously(this);

    • 停止专用任务

    我们需要确保在禁用插件时停止专用任务,以防止 bukkit 发送错误“This plugin is not properly shutting down its async tasks when it is being reloaded.”。这很容易做到,因为我们在上面为此做了一个特殊的条件。

    为此,我们将以下代码放入我们的onDisable()

    messageQueue.add(new Pair&lt;&gt;(null,null));

    • 重写我们的方法以使用 messageQueue

    我们在这个过程中的最后一步是重写sendToAll 方法以使用我们的队列。这真的很容易做到,只需要我们替换 1 行。

    public void sendToAll(String data) {
        Collection<WebSocket> con = connections();
        synchronized (con) {
            for (WebSocket c : con) {
                messageQueue.add(new Pair<>(c,data)); // Ferrybig: Use messageQueue
            }
        }
    }
    

    同样的小修改也可以对sendData 方法进行,但我没有做为读者的练习。

旁注

BlockingQueue 在设计时考虑了并发操作,不需要外部同步。

您可以选择使用BlockingQueue.offer() 而不是BlockingQueue.add(),因为后者在列表已满时抛出异常,但第一个返回 false。

LinkedBlockingDeque 的默认大小是Integer.MAX_VALUE,可以通过constructor 更改。

【讨论】:

    猜你喜欢
    • 2015-06-08
    • 2014-03-17
    • 2014-03-13
    • 1970-01-01
    • 2021-10-17
    • 1970-01-01
    • 1970-01-01
    • 2010-12-31
    • 1970-01-01
    相关资源
    最近更新 更多