【发布时间】:2020-08-13 19:50:31
【问题描述】:
我有以下场景:
java 1.8
spring-boot-starter-parent 2.3.2.RELEASE
spring-webflux 5.2.8.RELEASE
spring-boot-starter-reactor-netty 2.3.2.RELEASE
应用程序在 WebSphere Application Server 9 上运行
我的应用程序是一个支持重新连接的 Netty 客户端,它读取发送到套接字的数据。获取数据并搜索帧的开始和结束分隔符,一旦找到它,它就会将其发送到处理信息的下一个处理程序。几天或几小时后,客户端停止捕获发送到套接字的数据,观察日志文件唯一的错误是:
2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection
这是主类
public void run() {
LOGGER.info("Levantando la aplicacion CAPTURADOR");
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}
这是 FrameExtractor 类
/**
* En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo,
* aprovechamos para crear el buffer que mencionamos.
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer();
}
/**
* En el metodo channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar
* activo, aprovechamos para liberar el buffer que creamos @channelRegistered.
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (null != buf) {
buf.release();
buf = null;
}
}
/**
* Arma el envio de la medicion buscando el fin de trama y lo pasa al siguiente handler
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Level level = ResourceLeakDetector.getLevel();
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
buf.readBytes(indexOf2);
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
}
}
这是客户端处理程序
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
try {
buffer.writeBytes((ByteBuf) msg);
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
bytes = CapturadorUtils.eliminarParidad(bytes);
String trama = new String(bytes);
CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
? capturadorFactory.getCapturador(Directlink.getDirectlink())
: capturadorFactory.getCapturador(Microcom.MICROCOM);
capturadorGenerico.parsearTrama(trama, bytes);
} catch (Exception e) {
LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg, e);
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buffer);
}
}
查看代码并分析文档https://netty.io/wiki/reference-counted-objects.html我没有发现错误可能是什么。缓冲区已正确释放。
添加日志文件
2020-08-14 11:23:30,404 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:173)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
cl.mop.dga.satelital.capturador.handler.FrameExtractor.channelRead(FrameExtractor.java:76)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:682)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:617)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:534)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:785)
在行中
ByteBuf line = ctx.alloc().buffer();
line = buf.copy(indexOf1, indexOf2 - indexOf1);
我换个方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
// nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
buf.writeBytes((ByteBuf) msg);
String data = buf.toString(Charset.defaultCharset());
LOGGER.info("Trama recibida: " + data);
int indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
int indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
while (-1 != indexOf2) {
// Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
ByteBuf line = buf.copy(indexOf1, indexOf2 - indexOf1);
// Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
buf.readBytes(indexOf2);
// Avisamos al siguiente handler, pasandole nuestro buffer line. No liberamos el buffer line porque es
// responsabilidad del que lo recibe.
ctx.fireChannelRead(line);
buf.discardReadBytes();
indexOf1 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 1);
indexOf2 = indexOf(buf, Directlink.DELIMETER_DIRECTLINK, 2);
}
} finally {
// Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
// nadie es nuestra responsabilidad liberarlo.
ReferenceCountUtil.release(msg);
}
}
仍然有错误
2020-08-27 16:33:36,256 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected
但现在排队
buf.readBytes(indexOf2);
添加类 ClientWithNettyHandlers
`public class ClientWithNettyHandlers extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientWithNettyHandlers.class);
@Autowired
@Qualifier("ClientHandler")
ClientHandler clientHandler;
@Autowired
@Qualifier("FrameExtractor")
FrameExtractor frameExtractor;
private volatile EventLoopGroup workerGroup;
private volatile Bootstrap bootstrap;
private volatile boolean closed = false;
private String remoteHost;
private int remotePort;
private String logLevel;
@Bean
public void run() {
closed = false;
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
});
socketChannel.pipeline().addLast(frameExtractor);
socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG", LogLevel.valueOf(logLevel)));
socketChannel.pipeline().addLast(clientHandler);
}
});
doConnect();
}
/**
*
*/
private void doConnect() {
if (closed) {
return;
}
ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
LOGGER.info("Started Tcp Client: " + getServerInfo());
} else {
LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
f.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
}
});
}
/**
*
*/
@PreDestroy
public void closeNettyClient() {
close();
System.out.println("Shutting down Netty Client: " + getServerInfo());
}
/**
*
*/
public void close() {
closed = true;
Future<?> future = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
LOGGER.info("Stopped Tcp Client: " + getServerInfo());
}
/**
*
* @return
*/
private String getServerInfo() {
return String.format("RemoteHost=%s RemotePort=%d", remoteHost, remotePort);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(ClientWithNettyHandlers.class);
}
/**
*
* @param args
*/
public static void main(String[] args) {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
SpringApplication.run(ClientWithNettyHandlers.class, args);
}
}
【问题讨论】:
-
请按照您链接的文档中的说明启用高级/偏执泄漏检测并编辑问题以包括记录的堆栈跟踪。
-
感谢 Norman 的回答,这是我尝试的第一件事,我将 ResourceLeakDetector.setLevel(Level.PARANOID) 行放在主类中,但日志没有改变。我也尝试从 jvm -Dio.netty.leakDetectionLevel = PARANOID 配置它,但我无法让它向我展示更高级别的细节。我正在使用 logback。
-
有人有线索吗?
标签: java spring-boot netty