【问题标题】:IntegrationFlow Exception Handling集成流异常处理
【发布时间】:2018-05-17 06:09:19
【问题描述】:

我正在尝试为消息创建一个流,如下所示:

TCPinboundAdapter ----> 消息代理(ActiveMQ)

流程:

这个流程是通过以下方式创建的

  1. 消息通过 TCP 连接接收到 TCP 适配器,可能是客户端或服务器。
  2. 接收到 TCP 适配器的消息被发送到 JMS 适配器(ActiveMQ 代理)。

代码如下:

@EventListener
public void handleTcpConnectionClientEvent(TcpConnectionFailedEvent event){

     TcpNioClientConnectionFactory tcp = (TcpNioClientConnectionFactory)event.getSource();
     System.out.println(tcp); 
     System.out.println("connection exception client :::"+event.getSource());

     this.status = event.toString();

 }
 @EventListener
 public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){
     System.out.println("connection exception server :::");

     this.status = event.toString();

 }

 // this method is invoked when the connection with the sever got disconnected
 @EventListener
 public void handleTcpConnectionServerEvent(TcpConnectionExceptionEvent event){
     System.out.println("connection exception serversssss :::"+event.getConnectionFactoryName());
     this.status = event.toString();

 }

 //when the connection got established (not for first time)
 @EventListener
 public void handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event){
     System.out.println("connection opened :::"+event.getConnectionFactoryName());
    // status = event.toString();

 }

// create a server connection and flow to JMS  
private void createServerConnection(HostConnection hostConnection)  throws Throwable{
    this.status = "success";

    // IntegrationFlow flow;


IntegrationFlowRegistration theFlow;
     IntegrationFlow flow = 
IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
             .serializer(customSerializer)
             .deserializer(customSerializer)
             .id(hostConnection.getConnectionNumber()).soTimeout(10000)))
             .enrichHeaders(f->f.header("abc","abc")))
             .channel(directChannel())
             .handle(Jms.outboundAdapter(ConnectionFactory())
             .destination("jmsInbound"))
             .get();

           theFlow = this.flowContext.registration(flow).id("test.flow").register();


           if(this.status.equals("success"))
           createInboundFlow(hostConnection);

          // startConnection(hostConnection.getConnectionNumber());

}

问题:

此流程创建成功,并在没有异常时注册到应用程序上下文。 但是万一出现异常即(BindException)

  1. 当为特定端口创建服务器并且该端口已被使用时 然后它引发 BindException 然后流也被注册 所以,我们希望当下面的任何流组件出现异常时,不应该注册流。

    IntegrationFlowRegistration theFlow;
          IntegrationFlow flow = 
               IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
              .serializer(customSerializer)
              .deserializer(customSerializer)
              .id("server").soTimeout(10000)))
              .enrichHeaders(f->f.header("abc","abc")))
              .channel(directChannel())
              .handle(Jms.outboundAdapter(ConnectionFactory())
              .destination("jmsInbound"))
              .get();
    
          theFlow =this.flowContext.registration(flow).id("test.flow").register();
    

实现了各种侦听器来检查 TCP 连接中的异常 try{}catch() 块不引发任何异常。

请提供一种合适的方法来处理适配器的异常,目前我正在使用侦听器处理各种事件,以了解 tcp 适配器有问题。

应用Artem Bilan先生提供的这种方法后

@EventListener public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){ System.out.println("connection exception server :::"+event); this.status = event.getCause().getMessage(); AbstractConnectionFactory server = (AbstractConnectionFactory)event.getSource(); System.out.println(server.getComponentName()); this.flowContext.remove(server.getComponentName()+"out.flow"); }

我可以使用 FlowId 删除流,但我无法捕获异常 下面的异常正在控制台上打印,即使我将方法更改为也无法处理

private void createServerConnection(HostConnection hostConnection) throws Throwable{}

并在调用函数中使用 try{}catch(Throwable t){} 处理这些异常

Exception in thread "pool-4-thread-1" java.lang.NullPointerException

下面提供的日志中以更详细的形式描述了异常:

    2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] 
    .s.i.i.t.c.TcpNetServerConnectionFactory : started Co123, port=1234
2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851 ERROR 18332 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Error on ServerSocket; port = 1234

java.net.BindException: Address already in use: JVM_Bind
    at java.net.DualStackPlainSocketImpl.bind0(Native Method) ~[na:1.8.0_111]
    at java.net.DualStackPlainSocketImpl.socketBind(Unknown Source) ~[na:1.8.0_111]
    at java.net.AbstractPlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.PlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at javax.net.DefaultServerSocketFactory.createServerSocket(Unknown Source) ~[na:1.8.0_111]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.createServerSocket(TcpNetServerConnectionFactory.java:211) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:106) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_111]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]

connection exception server :::TcpConnectionServerExceptionEvent [source=Co123, port=1234, cause=java.net.BindException: Address already in use: JVM_Bind]
Co123
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : stopped org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {transformer} as a subscriber to the 'Co123out.flow.channel#0' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#0' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#11
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {jms:outbound-channel-adapter} as a subscriber to the 'Co123out.flow.channel#1' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#1' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#12
Exception in thread "pool-4-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)`

【问题讨论】:

    标签: spring-boot spring-integration spring-integration-dsl


    【解决方案1】:

    您通过以下方式注册IntegrationFlow

    this.flowContext.registration(flow).id("test.flow").register();
    

    相同的his.flowContext bean 和流的 id 可用于从任何其他地方销毁流,例如一个事件监听器,当你捕捉到提到的BindException:

        /**
     * Destroy an {@link IntegrationFlow} bean (as well as all its dependant beans)
     * for provided {@code flowId} and clean up all the local cache for it.
     * @param flowId the bean name to destroy from
     */
    void remove(String flowId);
    

    【讨论】:

    • TcpConnectionServerExceptionEvent 是方法 handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent) 中提供的事件侦听器,可用于丢弃流,但如果它不是硬编码值“test.flow”,我将如何获取 id。我可以在其中投射“事件”的任何特定类。您可以参考上面提供的代码。
    • 请注意,我的 flowId 与服务器适配器 ID 相同,所以我想要使用它的服务器 ID,我可以删除流
    • 你有这段代码`.id(hostConnection.getConnectionNumber()). You can combine this id`和flowId。然后在TcpConnectionServerExceptionEvent的`@EventListener`中你得到soure并将其转换为AbstractConnectionFactory,调用它的getComponentName()并从那里提取flowId以销毁IntegrationFlow
    • 我在 org.springframework.integration 的线程“pool-4-thread-1”java.lang.NullPointerException 中得到这个异常,它不能被 Throwable 类或任何 EventListener $ 'Exception 处理.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)'跨度>
    • 这里的 cmets 无法读取。请编辑您的问题并正确格式化 StackTrace。有更多的东西会很棒。另外,请提及您使用的 Spring Integration 版本。
    猜你喜欢
    • 2022-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多