【发布时间】:2020-08-05 18:45:52
【问题描述】:
我有三个进程,分别称为 LWA1、LWA2 和 LWA3。每个都有一个服务器,LWA1 的端口为 55555,LWA2 的端口为 55556,LWA3 的端口为 55557。 此外,每个进程都有一个客户端,以便连接到其他进程。
每个进程都应该能够对其他进程进行写入和读取。所以:
- LWA1 应该向/从 LWA2 和 LWA3 写入和读取
- LWA2 应该对 LWA1 和 LWA3 进行读写操作
- LWA3 应该对 LWA1 和 LWA2 进行读写操作
目前,每个进程执行两次写入,但只收到一条消息。每个进程的输出如下(选项卡式打印属于客户端,非选项卡式打印属于服务器)。
LWA1:
Setting up server with port: 55555
Server configured.
Opening sockets to port 55556 and port 55557
Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted
LWA2:
Setting up server with port: 55556
Server configured.
Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}
LWA3:
Setting up server with port: 55557
Server configured.
Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}
如您所见,每个客户端都向另外两个写入 LamportRequest,但另外两个只收到一条消息。为什么另一条消息没有通过?
我怀疑这可能与服务器中的密钥有关,但不知道可能是什么。另外,我并不完全理解它们。如果我错了,请纠正我:
到 Selector 的每个连接都用不同的 (SelectableChannel) 键表示,因此服务器 LWA1 中的迭代器(例如)应该只有(因此,只监听事件)两个键,一个用于 LWA2,另一个用于对于 LWA3,对吗?我尝试在 keyAccept 方法中为每个键附加整数来区分它们,效果很好,但是在 keyRead 方法中打印附加的整数时,它显示为 null。那个方法的关键是新的吗?第三把钥匙突然出现了?
额外问题:我应该在一个线程中实现这个结构。目前我使用两种,一种用于服务器,一种用于客户端。一旦它开始工作,关于如何统一它们的任何提示?
----------------- 代码 -----------------
服务器(为阅读目的而简化)如下:
public TalkToBrotherSocket(int clock, int port) {
this.port = port;
this.clock = clock;
try {
setServer();
System.out.println("Server configured.\n");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
// Wait for an event one of the registered channels
selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check if they key is ready to accept a new socket connection
if (key.isAcceptable()) {
keyAccept(key);
System.out.println("Key accepted");
} else if (key.isReadable()){
System.out.println("Reading data from server");
keyRead(key);
} else if (key.isWritable()){
System.out.println("Writting data from server");
keyWrite(key); //unused at the moment
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void keyRead(SelectionKey key) throws IOException {
// Create a SocketChannel to read the request
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
buffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(buffer);
} catch (IOException e) {
System.out.println("Closing socket");
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
System.out.println("Shutting down socket");
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
System.out.println("I read: " + new String(buffer.array()).trim());
}
private void keyAccept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
//Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void setServer() throws IOException {
// Create a new selector
selector = Selector.open();
// Create a new non-blocking server socket channel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
serverChannel.bind(new InetSocketAddress("localhost", port));
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
客户端(为阅读目的而简化)如下:
public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
this.process = process;
this.clock = clock;
this.id = id;
try {
System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
firstBuffer = ByteBuffer.allocate(1024);
secondBuffer = ByteBuffer.allocate(1024);
sendRequests();
} catch (IOException e) {
e.printStackTrace();
}
}
private void sendRequests() {
LamportRequest lamportRequest = new LamportRequest(clock, process, id);
firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
System.out.println("\tSending lamport request: " + converted);
try {
firstClient.write(firstBuffer);
secondClient.write(secondBuffer);
firstBuffer.clear();
}
初始化如下:
System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();
new NIOClient(clock, firstPort, secondPort, id, process);
【问题讨论】:
-
firstClient.write(buffer)如果成功则清空缓冲区。您需要在写入之前保存缓冲区位置和限制,并在第二次写入时恢复它们。或者只是从消息中创建两个缓冲区。 -
@user207421 感谢您的评论。我在第一次写入后打印了缓冲区,它仍然有消息,但为了确保,我制作了第二个缓冲区并在第二次写入时使用它。没有解决问题,但为了安全起见,我会保留您建议的更改。无论如何,感谢您的贡献。
-
打印的内容并不重要。重要的是发送了什么,而您的原始代码第二次什么也没发送。没有两种方法。提供您当前的代码,让我们看看。
-
@user207421 使用更改及其新输出编辑了帖子。仅对 sendRequests 方法进行了更改。
-
叹息。读我写的。 '从消息中创建两个缓冲区'。分配引用根本不构成创建两个缓冲区,更不用说“从消息中”了。您需要再次调用
wrap()方法并将结果分配给第二个缓冲区变量。
标签: java sockets client-server nio