【发布时间】:2017-06-15 21:07:46
【问题描述】:
当客户端与 Apache Camel 连接断开时,我需要记录。如何检测 netty-tcp 断开连接?
from("{{uri.client.address}}")
【问题讨论】:
标签: logging error-handling apache-camel
当客户端与 Apache Camel 连接断开时,我需要记录。如何检测 netty-tcp 断开连接?
from("{{uri.client.address}}")
【问题讨论】:
标签: logging error-handling apache-camel
使用ErrorHandler,例如LoggingErrorHandler:
RouteBuilder builder = new RouteBuilder() {
public void configure() {
// use logging error handler
errorHandler(loggingErrorHandler("com.mycompany.foo"));
// here is our regular route
from("{{uri.client.address}}").to("mock:out");
}
};
【讨论】:
记录单个客户端断开连接有点复杂,但我会尝试分享我必须做的事情。
您需要使用自己的类扩展 SimpleChannelInboundHandler 并添加所需的方法。有关示例类,请参见此处:
查看我已更改为包括客户端断开连接的主要方法是:
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//command to send when there is a disconnect from client side
String command = "clientDisconnect";
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
LOG.info("Channel has disconnected: affects ", uniqueClientId);
//create Exchange and let the consumer process it
final Exchange exchange = consumer.getEndpoint().createExchange(ctx, command);
//create inOnly as there is no client waiting for a response
exchange.setPattern(ExchangePattern.InOnly);
// set the exchange charset property for converting
if (consumer.getConfiguration().getCharsetName() != null) {
exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(consumer.getConfiguration().getCharsetName()));
}
// we want to handle the UoW
consumer.createUoW(exchange);
//set the uniqueClientId
beforeProcess(exchange, ctx, command);
processAsynchronously(exchange, ctx, command);
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
super.channelInactive(ctx);
}
如果通道已连接,请注意我记录的部分。
如果你想在你的主路由中为不同的客户端做不同的事情,你也可以记录不同客户端的 IP 地址。这应该对您有所帮助。
编辑: 扩展该类后,调用扩展类 TCPServerHandler。然后你需要创建一个 TCPServerHandlerFactory 来引用之前的类并设置你的编码器/解码器。这个类可以是这样的:
public class TCPServerInitializerFactory extends ServerInitializerFactory {
private NettyConsumer consumer;
public TCPServerInitializerFactory(NettyConsumer consumer) {
this.consumer = consumer;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8));
**pipeline.addLast("handler", new TCPServerHandler(consumer));**
}
@Override
public ServerInitializerFactory createPipelineFactory(NettyConsumer nettyConsumer) {
return new TCPServerInitializerFactory(nettyConsumer);
}
}
标有 ** 的行是您创建 TCPServerHandler 类的位置。
然后回到你的路由类中的骆驼,你需要像这样将工厂添加到你的注册表中:
TCPServerInitializerFactory serverFactory = new TCPServerInitializerFactory(null);
final CamelContext camelContext = getContext();
final org.apache.camel.impl.SimpleRegistry registry = new org.apache.camel.impl.SimpleRegistry();
final org.apache.camel.impl.CompositeRegistry compositeRegistry = new org.apache.camel.impl.CompositeRegistry();
compositeRegistry.addRegistry(camelContext.getRegistry());
compositeRegistry.addRegistry(registry);
((org.apache.camel.impl.DefaultCamelContext) camelContext).setRegistry(compositeRegistry);
registry.put("spf", serverFactory);
然后这样引用它:
from("netty4:tcp://0.0.0.0:3000?serverInitializerFactory=#spf&sync=true")...
现在您可能想知道,所有这些都是为了注册断开连接的日志。这是因为要能够区分客户端何时断开连接并实际获取 Netty 代码所需的客户端 ip。你可以在交换中保存一些价值,然后在你的路线中做一些事情。
【讨论】: