【发布时间】:2015-11-16 18:36:07
【问题描述】:
我正在尝试使用 netty v4.0.30 编写代理服务器。我已经浏览了版本中包含的代理示例 (http://netty.io/4.0/xref/io/netty/example/proxy/package-summary.html)。但是我的要求有点不同。
在我的情况下,我的 netty 实例后面可以有多个服务器,所以我不能直接在 ChannelActive 方法中创建客户端引导程序。我的客户端本质上是向我的 netty 服务器发送两个请求(都是 TCP):-
Request1:- 在端口 X 连接到后端服务器 A。此时我应该能够打开与后端服务器的连接并回复成功作为对客户端的响应
Request2:- 客户端在 netty 转发到后端服务器的同一个套接字上写入的实际数据。
由于可以有许多后端服务器,因此这两个请求。由于我仍在尝试学习 netty,因此任何有关相同的技巧都会有很大帮助。
提前致谢。
编辑:
这是我的处理程序,它能够连接到第一个请求中提供的多个后端服务器:-
入站通道处理程序
public class TunnelInboundHandler extends ChannelInboundHandlerAdapter {
// objects for client bootstrap and outbound channel
private Bootstrap b = new Bootstrap()
.group(new NioEventLoopGroup(1))
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,15000)
.option(ChannelOption.SO_SNDBUF, 1048576)
.option(ChannelOption.SO_RCVBUF, 1048576)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// to differentiate between request to connect and actual data
Attribute<Boolean> connected = ctx.attr(isConnected);
// to store outbound channel object
Attribute<Channel> channelContx = ctx.attr(channelContext);
// first request id of format - CONNECT-<IP>-<PORT>
if(connected.get() == null)
{
ByteBuf in = (ByteBuf) msg;
String connectDest = "";
try {
while (in.isReadable()) {
connectDest = connectDest + (char) in.readByte();
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
String[] connectDestArr = connectDest.split("-");
b.channel(ctx.channel().getClass());
b.handler(new NettyTargetHandlerInitilizer(ctx.channel()));
ChannelFuture f = b.connect(connectDestArr[1].trim(), Integer.parseInt(connectDestArr[2].trim()));
outboundChannel = f.channel();
channelContx.set(outboundChannel);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
ctx.channel().read();
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().close();
}
}
});
// response Success to client so that Actual request is sent
String response = "SUCCESS\n";
ByteBuf res = ctx.alloc().buffer(response.length());
res.writeBytes(response.getBytes());
ctx.write(res);
ctx.flush();
// set connected as true to identify first request completion
connected.set(true);
}else if(connected.get()){
if (channelContx.get().isActive()) {
channelContx.get().writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
});
} else {
// System.out.println("Outbound Channel Not Active");
}
}
}
}
出站通道处理程序
public OutBoundTargetHandler(Channel inboundChannel) {
// System.out.println("Initlizing target pool");
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("Activating Chanel");
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("Receving data");
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (inboundChannel.isActive()) {
inboundChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
一切都按预期工作,唯一的问题是我的客户端引导程序在请求完成后没有关闭。因此,对于每个请求,我的线程数都会增加一个。有什么相同的提示吗?
【问题讨论】:
-
参见What does it mean if a question is "closed" or "on hold"? 可能的答案太多,或者对于这种格式来说,好的答案太长了。请添加详细信息以缩小答案范围或隔离可以在几段中回答的问题。
标签: netty