我们这里有几个问题:
- 我们不能像对 InputStreams 和 OutputStreams 那样简单地将 SocketFactories 相互包装。
- Java 的基于 zlib 的 DeflatorOutputStream 不实现刷新。
我想我找到了一种机制。
这将是一个部分系列,因为它需要一些时间来编写。
(你可以找到完成的东西的源代码in my github repository)。
自定义 SocketImpl
Socket 始终基于实现SocketImpl 的对象。因此,拥有自定义套接字实际上意味着使用自定义 SocketImpl 类。这是一个基于一对流(和一个基本套接字,用于关闭目的)的实现:
/**
* A SocketImpl implementation which works on a pair
* of streams.
*
* A instance of this class represents an already
* connected socket, thus all the methods relating to
* connecting, accepting and such are not implemented.
*
* The implemented methods are {@link #getInputStream},
* {@link #getOutputStream}, {@link #available} and the
* shutdown methods {@link #close}, {@link #shutdownInput},
* {@link #shutdownOutput}.
*/
private static class WrappingSocketImpl extends SocketImpl {
private InputStream inStream;
private OutputStream outStream;
private Socket base;
WrappingSocketImpl(StreamPair pair, Socket base) {
this.inStream = pair.input;
this.outStream = pair.output;
this.base = base;
}
StreamPair 是一个简单的数据持有者类,见下文。
这些是重要的方法:
protected InputStream getInputStream() {
return inStream;
}
protected OutputStream getOutputStream() {
return outStream;
}
protected int available() throws IOException {
return inStream.available();
}
然后是一些允许关闭的方法。这些都没有经过真正的测试(也许我们也应该关闭或至少刷新流?),但它似乎适用于我们的 RMI 使用。
protected void close() throws IOException {
base.close();
}
protected void shutdownInput() throws IOException {
base.shutdownInput();
// TODO: inStream.close() ?
}
protected void shutdownOutput() throws IOException {
base.shutdownOutput();
// TODO: outStream.close()?
}
接下来的一些方法将由 Socket 构造函数调用(或由 RMI 引擎中的某些东西间接调用),但实际上不需要做任何事情。
protected void create(boolean stream) {
if(!stream) {
throw new IllegalArgumentException("datagram socket not supported.");
}
}
public Object getOption(int optID) {
System.err.println("getOption(" + optID + ")");
return null;
}
public void setOption(int optID, Object value) {
// noop, as we don't have any options.
}
所有剩余的方法都不是必需的,我们实现它们抛出异常(所以我们会注意到这个假设是否错误)。
// unsupported operations
protected void connect(String host, int port) {
System.err.println("connect(" + host + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void connect(InetAddress address, int port) {
System.err.println("connect(" + address + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void connect(SocketAddress addr, int timeout) {
System.err.println("connect(" + addr + ", " + timeout + ")");
throw new UnsupportedOperationException();
}
protected void bind(InetAddress host, int port) {
System.err.println("bind(" + host + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void listen(int backlog) {
System.err.println("listen(" + backlog + ")");
throw new UnsupportedOperationException();
}
protected void accept(SocketImpl otherSide) {
System.err.println("accept(" + otherSide + ")");
throw new UnsupportedOperationException();
}
protected void sendUrgentData(int data) {
System.err.println("sendUrgentData()");
throw new UnsupportedOperationException();
}
}
这是构造函数使用的StreamPair:
/**
* A simple holder class for a pair of streams.
*/
public static class StreamPair {
public InputStream input;
public OutputStream output;
public StreamPair(InputStream in, OutputStream out) {
this.input = in; this.output = out;
}
}
下一部分:使用它来实现一个 Socket 工厂。
一个 Socket 工厂,包装另一个。
我们在这里处理的是 RMI 套接字工厂(即 java.rmi.server 中的 RMIClientSocketFactory、RMIServerSocketFactory、RMISocketFactory),但同样的想法也适用于使用套接字工厂接口的其他库。例如javax.net.SocketFactory(和ServerSocketFactory)、Apache Axis 的SocketFactory、JSch 的SocketFactory。
通常,这些工厂的想法是,它们以某种方式连接到不同于原始服务器的另一台服务器(代理),然后进行一些协商,或者简单地现在可以在同一连接中继续,或者已经通过其他一些协议(使用包装流)来隧道化真实连接。相反,我们希望让其他一些套接字工厂进行原始连接,然后只对自己进行封装。
RMI 为客户端和服务器套接字工厂提供了单独的接口。客户端套接字工厂将被序列化并与远程存根一起从服务器传递到客户端,从而允许客户端访问服务器。
还有一个 RMISocketFactory 抽象类实现了这两个接口,并提供了一个 VM 全局默认套接字工厂,它将用于所有没有自己的远程对象。
我们现在将实现这个类的一个子类(从而也实现这两个接口),允许用户提供一个基础客户端和服务器套接字工厂,然后我们将使用它。我们的类必须是可序列化的,以允许将其传递给客户端。
/**
* A base class for RMI socket factories which do their
* work by wrapping the streams of Sockets from another
* Socket factory.
*
* Subclasses have to overwrite the {@link #wrap} method.
*
* Instances of this class can be used as both client and
* server socket factories, or as only one of them.
*/
public abstract class WrappingSocketFactory
extends RMISocketFactory
implements Serializable
{
(想象一下所有其余部分都相对于这个类缩进。)
我们想引用其他工厂,这里是字段。
/**
* The base client socket factory. This will be serialized.
*/
private RMIClientSocketFactory baseCFactory;
/**
* The base server socket factory. This will not be serialized,
* since the server socket factory is used only on the server side.
*/
private transient RMIServerSocketFactory baseSFactory;
这些将由简单的构造函数初始化(我在这里不再重复 - 查看 github 存储库以获取完整代码)。
抽象wrap方法
为了让这种“套接字工厂的包装”具有通用性,我们在这里只做通用机制,并在子类中进行流的实际包装。然后我们可以有一个压缩/解压缩的子类,一个加密的,一个日志的,等等。
这里我们只声明wrap方法:
/**
* Wraps a pair of streams.
* Subclasses must implement this method to do the actual
* work.
* @param input the input stream from the base socket.
* @param output the output stream to the base socket.
* @param server if true, we are constructing a socket in
* {@link ServerSocket#accept}. If false, this is a pure
* client socket.
*/
protected abstract StreamPair wrap(InputStream input,
OutputStream output,
boolean server);
这个方法(以及 Java 不允许多个返回值的事实)是 StreamPair 类的原因。或者,我们可以有两个单独的方法,但在某些情况下(对于 SSL),有必要知道哪两个流是配对的。
客户端套接字工厂
现在,让我们看看客户端套接字工厂的实现:
/**
* Creates a client socket and connects it to the given host/port pair.
*
* This retrieves a socket to the host/port from the base client
* socket factory and then wraps a new socket (with a custom SocketImpl)
* around it.
* @param host the host we want to be connected with.
* @param port the port we want to be connected with.
* @return a new Socket connected to the host/port pair.
* @throws IOException if something goes wrong.
*/
public Socket createSocket(String host, int port)
throws IOException
{
Socket baseSocket = baseCFactory.createSocket(host, port);
我们从基础工厂中检索一个套接字,然后...
StreamPair streams = this.wrap(baseSocket.getInputStream(),
baseSocket.getOutputStream(),
false);
... 用新的流包裹它的流。 (这个wrap 必须由子类实现,见下文)。
SocketImpl wrappingImpl = new WrappingSocketImpl(streams, baseSocket);
然后我们使用这些流来创建我们的 WrappingSocketImpl(见上文),并传递它...
return new Socket(wrappingImpl) {
public boolean isConnected() { return true; }
};
... 到一个新的 Socket。我们必须继承Socket,因为这个构造函数是受保护的,但这是合适的,因为我们还必须重写isConnected方法以返回true而不是false。 (记住,我们的 SocketImpl 已经连接,不支持连接。)
}
对于客户端套接字工厂,这已经足够了。对于服务器套接字工厂,它变得有点复杂。
包装服务器套接字
似乎没有办法用给定的 SocketImpl 对象创建一个 ServerSocket——它总是使用静态的 SocketImplFactory。因此,我们现在继承 ServerSocket,只是忽略它的 SocketImpl,而是委托给另一个 ServerSocket。
/**
* A server socket subclass which wraps our custom sockets around the
* sockets retrieves by a base server socket.
*
* We only override enough methods to work. Basically, this is
* a unbound server socket, which handles {@link #accept} specially.
*/
private class WrappingServerSocket extends ServerSocket {
private ServerSocket base;
public WrappingServerSocket(ServerSocket b)
throws IOException
{
this.base = b;
}
事实证明我们必须实现这个getLocalPort,因为这个数字是通过远程存根发送给客户端的。
/**
* returns the local port this ServerSocket is bound to.
*/
public int getLocalPort() {
return base.getLocalPort();
}
下一个方法是重要的。它的工作原理类似于我们上面的createSocket() 方法。
/**
* accepts a connection from some remote host.
* This will accept a socket from the base socket, and then
* wrap a new custom socket around it.
*/
public Socket accept() throws IOException {
我们让基础 ServerSocket 接受一个连接,然后包装它的流:
final Socket baseSocket = base.accept();
StreamPair streams =
WrappingSocketFactory.this.wrap(baseSocket.getInputStream(),
baseSocket.getOutputStream(),
true);
然后我们创建 WrappingSocketImpl,...
SocketImpl wrappingImpl =
new WrappingSocketImpl(streams, baseSocket);
...并创建另一个 Socket 的匿名子类:
// For some reason, this seems to work only as a
// anonymous direct subclass of Socket, not as a
// external subclass. Strange.
Socket result = new Socket(wrappingImpl) {
public boolean isConnected() { return true; }
public boolean isBound() { return true; }
public int getLocalPort() {
return baseSocket.getLocalPort();
}
public InetAddress getLocalAddress() {
return baseSocket.getLocalAddress();
}
};
这需要一些更多被覆盖的方法,因为它们似乎是由 RMI 引擎调用的。
我试图将它们放在一个单独的(非本地)类中,但这不起作用(在连接时客户端出现异常)。我不知道为什么。如果有人有想法,我很感兴趣。
return result;
}
}
有了这个 ServerSocket 子类,我们就可以完成我们的 ...
包装 RMI 服务器套接字工厂
/**
* Creates a server socket listening on the given port.
*
* This retrieves a ServerSocket listening on the given port
* from the base server socket factory, and then creates a
* custom server socket, which on {@link ServerSocket#accept accept}
* wraps new Sockets (with a custom SocketImpl) around the sockets
* from the base server socket.
* @param host the host we want to be connected with.
* @param port the port we want to be connected with.
* @return a new Socket connected to the host/port pair.
* @throws IOException if something goes wrong.
*/
public ServerSocket createServerSocket(int port)
throws IOException
{
final ServerSocket baseSocket = getSSFac().createServerSocket(port);
ServerSocket ss = new WrappingServerSocket(baseSocket);
return ss;
}
不用多说,评论里都已经说了。是的,我知道我可以在一行中完成这一切。 (原本在两行之间有一些调试输出。)
让我们完成课程吧:
}
下一次:跟踪套接字工厂。
跟踪套接字工厂。
为了测试我们的包装并查看是否有足够的刷新,这里是第一个子类的wrap 方法:
protected StreamPair wrap(InputStream in, OutputStream out, boolean server)
{
InputStream wrappedIn = in;
OutputStream wrappedOut = new FilterOutputStream(out) {
public void write(int b) throws IOException {
System.err.println("write(.)");
super.write(b);
}
public void write(byte[] b, int off, int len)
throws IOException {
System.err.println("write(" + len + ")");
super.out.write(b, off, len);
}
public void flush() throws IOException {
System.err.println("flush()");
super.flush();
}
};
return new StreamPair(wrappedIn, wrappedOut);
}
输入流按原样使用,输出流只是添加一些日志记录。
在服务器端是这样的([example]来自ant):
[example] write(14)
[example] flush()
[example] write(287)
[example] flush()
[example] flush()
[example] flush()
[example] write(1)
[example] flush()
[example] write(425)
[example] flush()
[example] flush()
我们看到有足够多的冲洗,甚至绰绰有余。 (数字是输出块的长度。)
(在客户端,这实际上抛出了一个 java.rmi.NoSuchObjectException。它以前工作过......不知道为什么它现在不工作。由于压缩示例确实工作而且我很累,我不会搜索它现在。)
下一步:压缩。
刷新压缩流
对于压缩,Java 在java.util.zip 包中有一些类。有一对DeflaterOutputStream / InflaterInputStream 通过包装另一个流来实现deflate 压缩算法,分别通过Deflater 或Inflater 过滤数据。 Deflater 和 Inflater 基于调用通用 zlib 库的本地方法。 (实际上,如果有人提供具有Deflater 和Inflater 替代实现的子类,流也可以支持其他算法。)
(还有 DeflaterInputStream 和 InflaterOutputStream,反之亦然。)
基于此,GZipOutputStream 和 GZipInputStream 实现了 GZip 文件格式。 (这主要添加了一些页眉和页脚,以及校验和。)
两个输出流have the problem(对于我们的用例)它们并不真正支持flush()。这是由 Deflater 的 API 定义中的缺陷引起的,它允许缓冲尽可能多的数据,直到最终的finish()。 Zlib 允许刷新其状态,只是 Java 包装器太笨了。
自 1999 年 1 月以来,bug #4206909 已对此开放,看起来它终于为 Java 7 修复了,万岁!如果你有 Java 7,你可以在这里简单地使用 DeflaterOutputStream。
由于我还没有 Java 7,我将使用 rsaddey 于 2002 年 6 月 23 日在 bug cmets 中发布的解决方法。
/**
* Workaround für kaputten GZipOutputStream, von
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4206909
* (23-JUN-2002, rsaddey)
* @see DecompressingInputStream
*/
public class CompressingOutputStream
extends DeflaterOutputStream {
public CompressingOutputStream (final OutputStream out)
{
super(out,
// Using Deflater with nowrap == true will ommit headers
// and trailers
new Deflater(Deflater.DEFAULT_COMPRESSION, true));
}
private static final byte [] EMPTYBYTEARRAY = new byte[0];
/**
* Insure all remaining data will be output.
*/
public void flush() throws IOException {
/**
* Now this is tricky: We force the Deflater to flush
* its data by switching compression level.
* As yet, a perplexingly simple workaround for
* http://developer.java.sun.com/developer/bugParade/bugs/4255743.html
*/
def.setInput(EMPTYBYTEARRAY, 0, 0);
def.setLevel(Deflater.NO_COMPRESSION);
deflate();
def.setLevel(Deflater.DEFAULT_COMPRESSION);
deflate();
out.flush();
}
/**
* Wir schließen auch den (selbst erstellten) Deflater, wenn
* wir fertig sind.
*/
public void close()
throws IOException
{
super.close();
def.end();
}
} // class
/**
* Workaround für kaputten GZipOutputStream, von
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4206909
* (23-JUN-2002, rsaddey)
* @see CompressingOutputStream
*/
public class DecompressingInputStream extends InflaterInputStream {
public DecompressingInputStream (final InputStream in) {
// Using Inflater with nowrap == true will ommit headers and trailers
super(in, new Inflater(true));
}
/**
* available() should return the number of bytes that can be read without
* running into blocking wait. Accomplishing this feast would eventually
* require to pre-inflate a huge chunk of data, so we rather opt for a
* more relaxed contract (java.util.zip.InflaterInputStream does not
* fit the bill).
* This code has been tested to work with BufferedReader.readLine();
*/
public int available() throws IOException {
if (!inf.finished() && !inf.needsInput()) {
return 1;
} else {
return in.available();
}
}
/**
* Wir schließen auch den (selbst erstellten) Inflater, wenn
* wir fertig sind.
*/
public void close()
throws IOException
{
super.close();
inf.end();
}
} //class
(这些是in the de.fencing_game.tools package in my github repository。)它有一些德国cmets,因为我最初一年前复制了这个用于我的另一个项目。)
在 Stackoverflow 上搜索了一下,我发现了 this answer by BalusC to a related question,它提供了另一种压缩输出流,并带有优化的刷新。我没有对此进行测试,但它可能是这个的替代品。 (它使用 gzip 格式,而我们在这里使用纯 deflate 格式。确保写入和读取流适合。)
另一种选择是使用JZlib,正如 bestsss 建议的那样,它是 ZOutputStream 和 ZInputStream。 It has not much documentation,但我正在努力。
下一次:压缩的 RMI 套接字工厂
压缩 RMI 套接字工厂
现在我们可以将它们整合在一起了。
/**
* An RMISocketFactory which enables compressed transmission.
* We use {@link #CompressingInputStream} and {@link #CompressingOutputStream}
* for this.
*
* As we extend WrappingSocketFactory, this can be used on top of another
* {@link RMISocketFactory}.
*/
public class CompressedRMISocketFactory
extends WrappingSocketFactory
{
private static final long serialVersionUID = 1;
//------------ Constructors -----------------
/**
* Creates a CompressedRMISocketFactory based on a pair of
* socket factories.
*
* @param cFac the base socket factory used for creating client
* sockets. This may be {@code null}, then we will use the
* {@linkplain RMISocketFactory#getDefault() default socket factory}
* of client system where this object is finally used for
* creating sockets.
* If not null, it should be serializable.
* @param sFac the base socket factory used for creating server
* sockets. This may be {@code null}, then we will use the
* {@linkplain RMISocketFactory#getDefault() default RMI Socket factory}.
* This will not be serialized to the client.
*/
public CompressedRMISocketFactory(RMIClientSocketFactory cFac,
RMIServerSocketFactory sFac) {
super(cFac, sFac);
}
// [snipped more constructors]
//-------------- Implementation -------------
/**
* wraps a pair of streams into compressing/decompressing streams.
*/
protected StreamPair wrap(InputStream in, OutputStream out,
boolean server)
{
return new StreamPair(new DecompressingInputStream(in),
new CompressingOutputStream(out));
}
}
就是这样。我们现在将这个工厂对象作为参数提供给UnicastRemoteObject.export(...)(用于客户端和服务器工厂),所有通信都将被压缩。 (version in my github repository有一个main方法和一个例子。)
当然,压缩优势不会像 RMI 这样的东西很大,至少当您不传输大字符串或类似的东西作为参数或返回值时。
下一次(睡后):结合 SSL 套接字工厂。
与 SSL 套接字工厂结合
如果我们使用默认类,Java 部分很简单:
CompressedRMISocketFactory fac =
new CompressedRMISocketFactory(new SslRMIClientSocketFactory(),
new SslRMIServerSocketFactory());
这些类(在 javax.rmi.ssl 中)使用默认的 SSLSocketFactory 和 SSLServerSocketFactory(在 javax.net.ssl 中),它们使用系统的默认密钥库和信任库。
因此,有必要使用密钥对创建一个密钥库(例如通过keytool -genkeypair -v),并使用系统属性javax.net.ssl.keyStore(密钥库的文件名)和javax.net.ssl.keyStorePassword(密钥库的密码)。
在客户端,我们需要一个信任库——即一个包含公钥的密钥库,或者一些签署服务器公钥的证书。出于测试目的,我们可以简单地使用与服务器相同的密钥库,对于生产,您当然不希望在客户端使用服务器的私钥。我们为它提供属性javax.net.ssl.trustStorejavax.net.ssl.trustStorePassword。
然后它归结为(在服务器端):
Remote server =
UnicastRemoteObject.exportObject(new EchoServerImpl(),
0, fac, fac);
System.err.println("server: " + server);
Registry registry =
LocateRegistry.createRegistry(Registry.REGISTRY_PORT);
registry.bind("echo", server);
与前面的例子一样,客户是股票客户:
Registry registry =
LocateRegistry.getRegistry("localhost",
Registry.REGISTRY_PORT);
EchoServer es = (EchoServer)registry.lookup("echo");
System.err.println("es: " + es);
System.out.println(es.echo("hallo"));
现在所有与 EchoServer 的通信都经过压缩和加密。
当然,为了完全安全,我们还希望与注册表的通信受到 SSL 保护,以避免任何中间人攻击(这也将允许通过给客户端一个假的 RMIClientSocketFactory 或假的来拦截与 EchoServer 的通信服务器地址)。