【问题标题】:How can I detect network disconnection in Apache Camel?如何检测 Apache Camel 中的网络断开连接?
【发布时间】:2017-06-15 21:07:46
【问题描述】:

当客户端与 Apache Camel 连接断开时,我需要记录。如何检测 netty-tcp 断开连接?

    from("{{uri.client.address}}")

【问题讨论】:

    标签: logging error-handling apache-camel


    【解决方案1】:

    使用ErrorHandler,例如LoggingErrorHandler:

    RouteBuilder builder = new RouteBuilder() {
        public void configure() {
            // use logging error handler
            errorHandler(loggingErrorHandler("com.mycompany.foo"));
    
            // here is our regular route
            from("{{uri.client.address}}").to("mock:out");
        }
    }; 
    

    【讨论】:

      【解决方案2】:

      记录单个客户端断开连接有点复杂,但我会尝试分享我必须做的事情。

      您需要使用自己的类扩展 SimpleChannelInboundHandler 并添加所需的方法。有关示例类,请参见此处:

      https://github.com/apache/camel/blob/master/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java

      查看我已更改为包括客户端断开连接的主要方法是:

       @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
          //command to send when there is a disconnect from client side
          String command = "clientDisconnect";
          if (LOG.isTraceEnabled()) {
            LOG.trace("Channel closed: {}", ctx.channel());
          }
          LOG.info("Channel has disconnected: affects ", uniqueClientId);
          //create Exchange and let the consumer process it
          final Exchange exchange = consumer.getEndpoint().createExchange(ctx, command);
          //create inOnly as there is no client waiting for a response
          exchange.setPattern(ExchangePattern.InOnly);
          // set the exchange charset property for converting
          if (consumer.getConfiguration().getCharsetName() != null) {
            exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(consumer.getConfiguration().getCharsetName()));
          }
          // we want to handle the UoW
          consumer.createUoW(exchange);
          //set the uniqueClientId
          beforeProcess(exchange, ctx, command);
          processAsynchronously(exchange, ctx, command);
          // to keep track of open sockets
          consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
          super.channelInactive(ctx);
      }
      

      如果通道已连接,请注意我记录的部分。

      如果你想在你的主路由中为不同的客户端做不同的事情,你也可以记录不同客户端的 IP 地址。这应该对您有所帮助。

      编辑: 扩展该类后,调用扩展类 TCPServerHandler。然后你需要创建一个 TCPServerHandlerFactory 来引用之前的类并设置你的编码器/解码器。这个类可以是这样的:

      public class TCPServerInitializerFactory extends ServerInitializerFactory  {
        private NettyConsumer consumer;
      
        public TCPServerInitializerFactory(NettyConsumer consumer) {
          this.consumer = consumer;
        }
        @Override
        protected void initChannel(Channel channel) throws Exception {
          ChannelPipeline pipeline = channel.pipeline();
          pipeline.addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8));
          pipeline.addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8));
          **pipeline.addLast("handler", new TCPServerHandler(consumer));**
        }
      
        @Override
        public ServerInitializerFactory createPipelineFactory(NettyConsumer nettyConsumer) {
          return new TCPServerInitializerFactory(nettyConsumer);
        }
      }
      

      标有 ** 的行是您创建 TCPServerHandler 类的位置。

      然后回到你的路由类中的骆驼,你需要像这样将工厂添加到你的注册表中:

      TCPServerInitializerFactory serverFactory = new TCPServerInitializerFactory(null);
          final CamelContext camelContext = getContext();
          final org.apache.camel.impl.SimpleRegistry registry = new org.apache.camel.impl.SimpleRegistry();
          final org.apache.camel.impl.CompositeRegistry compositeRegistry = new org.apache.camel.impl.CompositeRegistry();
          compositeRegistry.addRegistry(camelContext.getRegistry());
          compositeRegistry.addRegistry(registry);
          ((org.apache.camel.impl.DefaultCamelContext) camelContext).setRegistry(compositeRegistry);
      registry.put("spf", serverFactory);
      

      然后这样引用它:

      from("netty4:tcp://0.0.0.0:3000?serverInitializerFactory=#spf&sync=true")...
      

      现在您可能想知道,所有这些都是为了注册断开连接的日志。这是因为要能够区分客户端何时断开连接并实际获取 Netty 代码所需的客户端 ip。你可以在交换中保存一些价值,然后在你的路线中做一些事情。

      【讨论】:

      • 谢谢,但我是骆驼的新手。我应该如何使用 SimpleChannelInboundHandler 的扩展类?
      • 编辑了我的答案。
      • @Souciance Eqdam Rashti 我可以使用 getContext().getStatus() 检查每个请求的通道状态吗?
      • 如果你想持续检查通道的状态,你可能需要在 SimpleChannelInboundHandler 中添加另一个方法来使用 ChannelHandlerContext 来做到这一点。
      • 如何获得 NettyConsumer ?
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-04-13
      • 1970-01-01
      • 2012-11-03
      • 2013-09-09
      相关资源
      最近更新 更多