【发布时间】:2012-03-03 08:37:25
【问题描述】:
我有一个属于单个线程的套接字列表。但我想知道是否有一种可行的方式与那些客户进行通信(读/写)?我不想为每个客户端创建一个线程,因为可能有太多用户,为每个客户端创建一个线程可能成本太高。
【问题讨论】:
-
创建线程非常便宜。在担心性能之前,集中精力让您的程序正常运行。
我有一个属于单个线程的套接字列表。但我想知道是否有一种可行的方式与那些客户进行通信(读/写)?我不想为每个客户端创建一个线程,因为可能有太多用户,为每个客户端创建一个线程可能成本太高。
【问题讨论】:
我会说 NIO 是您最好的选择。查看通过套接字(嗯,SocketChannel)进行 NIO 通信的众多优秀教程之一!
我相信这是我在学习NIO时使用的教程:http://rox-xmlrpc.sourceforge.net/niotut/
【讨论】:
Netty - Java NIO 客户端服务器套接字框架 http://www.jboss.org/netty
【讨论】:
只需使用标准 Java NIO 最好的文档写在 Java 主页上 http://docs.oracle.com/javase/6/docs/technotes/guides/io/index.html。 有 API 文档、示例、教程。一切。 我向你保证,这已经足够了——我有编写软件的经验,其中 10k 客户端连接到一个客户端(几个线程)。您必须只记住操作系统限制才能在配置中更改它。
【讨论】:
您可以在 JRE 中使用 NIO 方法。另一种解决方案是使用Space Architecture。在此架构中存在具有Space 名称的全局空间,并且任何请求都写入该空间,然后另一个线程从该空间读取并处理它并将处理结果写入另一个空间,最后一步请求线程从指定空间读取自己的结果。
您可以查看以下链接了解更多信息:
【讨论】:
我必须连接多个服务器 IP:PORT 并进行请求-响应消息传递。在实现了具有多个线程的传统 IO 和杀死阻塞套接字的看门狗之后,放弃了。我做了 NIO 实现,这是我的测试应用程序供将来参考。
我可以在一个简单的单线程“游戏循环”中打开 N 个超时连接、读取超时响应、超时写入命令。如果我需要并发,我可以生成工作线程,但如果应用程序逻辑不需要它,则不是强制性的。
服务器是一个自定义的 telnet 应用程序,客户端编写命令并读取文本行,直到找到终止符行提示。终结者标记 end_of_response_packet。
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class ClientSocketNIO {
private String host;
private int port;
private String charset;
private ByteArrayOutputStream inBuffer;
private ByteBuffer buf;
private Selector selector;
private SocketChannel channel;
public ClientSocketNIO(String host, int port, String charset) {
this.charset = charset==null || charset.equals("") ? "UTF-8" : charset;
this.host = host;
this.port = port;
}
public void open(long timeout) throws IOException {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(new InetSocketAddress(host, port));
inBuffer = new ByteArrayOutputStream(1024);
buf = ByteBuffer.allocate(1*1024);
long sleep = Math.min(timeout, 1000);
while(timeout > 0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isConnectable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
channel.configureBlocking(false);
return; // we are ready to receive bytes
}
}
}
throw new IOException("Connection timed out");
}
public void close() {
try { channel.close(); } catch(Exception ex) { }
try { selector.close(); } catch(Exception ex) { }
inBuffer=null;
buf=null;
}
public List<String> readUntil(String terminator, long timeout, boolean trimLines) throws IOException {
return readUntil(new String[]{terminator}, timeout, trimLines);
}
public List<String> readUntil(String[] terminators, long timeout, boolean trimLines) throws IOException {
List<String> lines = new ArrayList<String>(12);
inBuffer.reset();
// End of packet terminator strings, line startsWith "aabbcc" string.
byte[][] arrTerminators = new byte[terminators.length][];
int[] idxTerminators = new int[terminators.length];
for(int idx=0; idx < terminators.length; idx++) {
arrTerminators[idx] = terminators[idx].getBytes(charset);
idxTerminators[idx] = 0;
}
int idxLineByte=-1;
channel.register(selector, SelectionKey.OP_READ);
long sleep = Math.min(timeout, 1000);
while(timeout>0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isReadable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
buf.clear();
int len = channel.read(buf);
System.out.println("read " + len);
if (len == -1) throw new IOException("Socket disconnected");
buf.flip();
for(int idx=0; idx<len; idx++) {
byte cb = buf.get(idx);
if (cb!='\n') {
idxLineByte++;
inBuffer.write(cb);
for(int idxter=0; idxter < arrTerminators.length; idxter++) {
byte[] arrTerminator = arrTerminators[idxter];
if (idxLineByte==idxTerminators[idxter]
&& arrTerminator[ idxTerminators[idxter] ]==cb) {
idxTerminators[idxter]++;
if (idxTerminators[idxter]==arrTerminator.length)
return lines;
} else idxTerminators[idxter]=0;
}
} else {
String line = inBuffer.toString(charset);
lines.add(trimLines ? line.trim() : line);
inBuffer.reset();
idxLineByte=-1;
for(int idxter=0; idxter<arrTerminators.length; idxter++)
idxTerminators[idxter]=0;
}
}
}
}
throw new IOException("Read timed out");
}
public void write(String data, long timeout) throws IOException {
ByteBuffer outBuffer = ByteBuffer.wrap(data.getBytes(charset));
channel.register(selector, SelectionKey.OP_WRITE);
long sleep = Math.min(timeout, 1000);
while(timeout > 0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isWritable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
int len = channel.write(outBuffer);
System.out.println("write " + len);
if (outBuffer.remaining()<1)
return;
}
}
throw new IOException("Write timed out");
}
public static void main(String[] args) throws Exception {
ClientSocketNIO client = new ClientSocketNIO("11.22.33.44", 1234, "UTF-8");
try {
client.open(15000);
// read prompting for username
List<String> reply = client.readUntil("User: ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
// write username and read a success or failed prompt(asks username once again),
// this one may return two different terminator prompts so listen for both
client.write("myloginname\n", 15000);
reply = client.readUntil(new String[]{"> ", "User: "}, 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
if (!reply.get(reply.size()-1).startsWith("Welcome ")) return; // Access denied
System.out.println("-----");
client.write("help\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
System.out.println("-----");
client.write("get status\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
System.out.println("-----");
client.write("get list\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
client.write("quit\n", 15000);
} finally {
client.close();
}
}
}
【讨论】: