【问题标题】:NIO sockets - distributed systemNIO sockets - 分布式系统
【发布时间】:2020-08-05 18:45:52
【问题描述】:

我有三个进程,分别称为 LWA1、LWA2 和 LWA3。每个都有一个服务器,LWA1 的端口为 55555,LWA2 的端口为 55556,LWA3 的端口为 55557。 此外,每个进程都有一个客户端,以便连接到其他进程。

每个进程都应该能够对其他进程进行写入和读取。所以:

  • LWA1 应该向/从 LWA2 和 LWA3 写入和读取
  • LWA2 应该对 LWA1 和 LWA3 进行读写操作
  • LWA3 应该对 LWA1 和 LWA2 进行读写操作

目前,每个进程执行两次写入,但只收到一条消息。每个进程的输出如下(选项卡式打印属于客户端,非选项卡式打印属于服务器)。

LWA1:

Setting up server with port: 55555
Server configured.

    Opening sockets to port 55556 and port 55557
    Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted

LWA2:

Setting up server with port: 55556
Server configured.

    Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}

LWA3:

Setting up server with port: 55557
Server configured.

    Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}

如您所见,每个客户端都向另外两个写入 LamportRequest,但另外两个只收到一条消息。为什么另一条消息没有通过?

我怀疑这可能与服务器中的密钥有关,但不知道可能是什么。另外,我并不完全理解它们。如果我错了,请纠正我:

到 Selector 的每个连接都用不同的 (SelectableChannel) 键表示,因此服务器 LWA1 中的迭代器(例如)应该只有(因此,只监听事件)两个键,一个用于 LWA2,另一个用于对于 LWA3,对吗?我尝试在 keyAccept 方法中为每个键附加整数来区分它们,效果很好,但是在 keyRead 方法中打印附加的整数时,它显示为 null。那个方法的关键是新的吗?第三把钥匙突然出现了?

额外问题:我应该在一个线程中实现这个结构。目前我使用两种,一种用于服务器,一种用于客户端。一旦它开始工作,关于如何统一它们的任何提示?

----------------- 代码 -----------------

服务器(为阅读目的而简化)如下:

public TalkToBrotherSocket(int clock, int port) {
    this.port = port;
    this.clock = clock;

    try {
        setServer();
        System.out.println("Server configured.\n");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
    while (true) {
        try {
            // Wait for an event one of the registered channels
            selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check if they key is ready to accept a new socket connection
                if (key.isAcceptable()) {
                    keyAccept(key);
                    System.out.println("Key accepted");
                } else if (key.isReadable()){
                    System.out.println("Reading data from server");
                    keyRead(key);
                } else if (key.isWritable()){
                    System.out.println("Writting data from server");
                    keyWrite(key); //unused at the moment
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


private void keyRead(SelectionKey key) throws IOException {
    // Create a SocketChannel to read the request
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    buffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(buffer);
    } catch (IOException e) {
        System.out.println("Closing socket");
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        System.out.println("Shutting down socket");
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    System.out.println("I read: " + new String(buffer.array()).trim());
}

private void keyAccept(SelectionKey key) throws IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    //Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(selector, SelectionKey.OP_READ);
}

private void setServer() throws IOException {
    // Create a new selector
    selector = Selector.open();

    // Create a new non-blocking server socket channel
    serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    serverChannel.bind(new InetSocketAddress("localhost", port));

    // Register the server socket channel, indicating an interest in
    // accepting new connections
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}

客户端(为阅读目的而简化)如下:

public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
    this.process = process;
    this.clock = clock;
    this.id = id;

    try {
        System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
        firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
        secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
        firstBuffer = ByteBuffer.allocate(1024);
        secondBuffer = ByteBuffer.allocate(1024);
        sendRequests();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private void sendRequests() {
    LamportRequest lamportRequest = new LamportRequest(clock, process, id);
    firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
    System.out.println("\tSending lamport request: " + converted);
    try {
        firstClient.write(firstBuffer);
        secondClient.write(secondBuffer);
        firstBuffer.clear();
}

初始化如下:

System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();

new NIOClient(clock, firstPort, secondPort, id, process);

【问题讨论】:

  • firstClient.write(buffer) 如果成功则清空缓冲区。您需要在写入之前保存缓冲区位置和限制,并在第二次写入时恢复它们。或者只是从消息中创建两个缓冲区。
  • @user207421 感谢您的评论。我在第一次写入后打印了缓冲区,它仍然有消息,但为了确保,我制作了第二个缓冲区并在第二次写入时使用它。没有解决问题,但为了安全起见,我会保留您建议的更改。无论如何,感谢您的贡献。
  • 打印的内容并不重要。重要的是发送了什么,而您的原始代码第二次什么也没发送。没有两种方法。提供您当前的代码,让我们看看。
  • @user207421 使用更改及其新输出编辑了帖子。仅对 sendRequests 方法进行了更改。
  • 叹息。读我写的。 '从消息中创建两个缓冲区'。分配引用根本不构成创建两个缓冲区,更不用说“从消息中”了。您需要再次调用wrap() 方法并将结果分配给第二个缓冲区变量。

标签: java sockets client-server nio


【解决方案1】:

在@user207421 的 cmets 之后,我添加了一个新的第二个缓冲区。对 sendRequests 方法的更改已编辑到原始帖子中。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-04-02
    • 2011-11-05
    • 1970-01-01
    • 2023-01-05
    • 2011-02-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多