【问题标题】:Spring integration multiple clients connecting to server portSpring集成多个客户端连接到服务器端口
【发布时间】:2016-05-24 13:03:09
【问题描述】:

从我的应用程序中,我需要配置需要连接到单个服务器的多个客户端连接。为此,我使用 ApplicationContext Beanfactory 根据我配置的客户端数量创建了数量可变的 bean。这是 2 个客户端的代码:

//setup beans;
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.scan("pkg");
ConnectionFactory factory = new ConnectionFactory();
int clients = 2; //TODO read this value from file
ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();
for (int count = 1; count <= clients; count++) {
    TcpNetClientConnectionFactory connectionFactory = factory.createClientConnectionFactory("127.0.0.1", 6680);

    //connection factory
    beanFactory.registerSingleton("connectionFactory_" + String.valueOf(count), connectionFactory);

    //inbound gateway
    MessageChannel input = new DirectChannel();
    MessageChannel output = new DirectChannel();
    TcpInboundGateway gateway = factory.createInboundGateway(connectionFactory, beanFactory, input, output, 10000, 20000);
    beanFactory.registerSingleton("gateway_" + String.valueOf(count), gateway);

    //message transformation and handling
    IntegrationFlow flow = factory.createFlow(input);
    beanFactory.registerSingleton("flow_" + String.valueOf(count), flow);
}
ctx.refresh();

//open connections
for(int count = 1; count <= clients; count++) {
    TcpInboundGateway gateway = ctx.getBean("gateway_" + count, TcpInboundGateway.class);
    //necessary for the client to connect
    gateway.retryConnection();
}

这是我的工厂方法:

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class ConnectionFactory {      
    public TcpNetClientConnectionFactory createClientConnectionFactory(String ip, int port) {
        TcpNetClientConnectionFactory factory = new TcpNetClientConnectionFactory(ip, port);
        factory.setSingleUse(false);
        factory.setSoTimeout(10000);
        factory.setSerializer(new ByteArrayLfSerializer());
        factory.setDeserializer(new ByteArrayLfSerializer());

        return factory;
    }

    public TcpInboundGateway createInboundGateway(
        AbstractConnectionFactory factory,
        BeanFactory beanFactory,
        MessageChannel input,
        int replyTimeout,
        int retryInterval) {
        TcpInboundGateway gateway = new TcpInboundGateway();
        gateway.setRequestChannel(input);
        gateway.setConnectionFactory(factory);
        gateway.setClientMode(true);
        gateway.setReplyTimeout(replyTimeout);
        gateway.setRetryInterval(retryInterval);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();
        gateway.setTaskScheduler(scheduler);
        gateway.setBeanFactory(beanFactory);

        return gateway;
    }

    public IntegrationFlow createFlow(MessageChannel input) {
        IntegrationFlowBuilder builder = IntegrationFlows.from(input);
        builder.transform(Transformers.objectToString()).handle(System.out::println);

        return builder.get();
    }
}

当我运行我的程序时,两个客户端都连接到我的服务器。但是,一旦服务器将其第一个有效负载发送到每个客户端,我就会收到以下异常(每个客户端一个):

Exception sending message: GenericMessage [payload=byte[5], headers={ip_tcp_remotePort=6680, ip_connectionId=localhost:6680:33372:e26b9973-a32e-4c28-b808-1f2556576d01, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4443ca34-fb53-a753-7603-53f6d7d82e11, ip_hostname=localhost, timestamp=1464098102462}]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:422) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    ... 14 common frames omitted

这个想法是读取数据,通过我为 InboundGateway 配置的通道发送到转换器,然后转换器会将数据转换为字符串,然后将其打印出来。

为什么框架不知道把数据放在哪个通道上?据我所知,我确实在入站网关工厂方法中为每个客户端创建了一个唯一通道。有人可以看看我的配置,让我知道我错过了什么,因为我完全被这个难住了。

【问题讨论】:

  • 我指的是这个例子,但我收到了无法实例化 ConnectionFactory 的错误。有什么问题?
  • 嗨,Shailesh。请发布一个新问题,并复制粘贴您遇到的确切错误,我会为您查找。
  • 其实我的问题就在这里[link]stackoverflow.com/questions/39265684/…

标签: java spring spring-integration


【解决方案1】:

没有人会使用来自您gateway.setReplyChannel(output); 的消息。

至少我们没有看到类似的东西:

之后它将被打印出来。

在大多数情况下,如果您的某些SubscribableChannel 没有任何订阅者,我们有Dispatcher has no subscribers:未配置或停止。

编辑

忘记我之前的表达。它适用于outbound 案例。

您的TcpInboundGateway 很好。虽然你不需要setReplyChannel(),因为你总是可以依赖默认的内置TemporaryReplyChannel 来等待下游流的一些结果。

您的IntegrationFlow 看起来也不错。 .transform() 不会向任何其他频道发送任何内容,这是正确的。它只是依赖于标题中的TemporaryReplyChannel

我认为你的问题是你没有为你的任何@Configuration 类指定@EnableIntegraitonhttp://docs.spring.io/spring-integration/reference/html/overview.html#_configuration

编辑 2

有关此事,请参阅GH issue

所以,除了代码之外,您还需要:

  1. beanFactory.initializeBean(); 用于您的每个手册registerSingleton()。因为看上一篇的JavaDocs:

    * <p>The given instance is supposed to be fully initialized; the registry
    * will not perform any initialization callbacks (in particular, it won't
    * call InitializingBean's {@code afterPropertiesSet} method).
    
  2. ctx.refresh() 之后执行此操作以注册所有必要的BeanPostProcessors,包括用于Spring Integration Java DSL 解析的一个。

  3. 调用ctx.start() 启动所有Lifecycles。因为常规的ctx.refresh() 进程看不到这些手动添加的新内容。

【讨论】:

  • 感谢您的回答。因此,如果我理解正确的话,replyChannel 将填充从服务器接收到的数据,即使入站处理程序在客户端模式下运行也是如此。我想我对此感到困惑,但现在在我的代码中通过在 createInboundGateway 方法中交换输入和输出来解决这个问题(请参阅编辑后的版本)。但是,我仍然得到同样的例外。回复通道上接收到的数据会不会发送到IntegrationFlow bean,然后由transformer进行转换?我还没有写回任何数据。
  • 我的工厂类的顶部确实有\@EnableIntegration、\@IntegrationComponentScan 和\@Configuration 注解,并且这个类由ApplicationContext 扫描。我同意这可能仍然存在问题,因为我必须手动配置 taskScheduler 和工厂以防止发生异常(据我了解,如果设置了上述注释,通常会自动创建此 taskScheduler)。由于我自己通过 ApplicationContext beanfactory 创建了 bean(与使用 \@Bean annotate 相比),我还需要额外设置什么吗?
  • 嗯。我还没有看到任何明显的东西。手动的gateway.retryConnection(); 调用看起来很可疑,因为一切都应该通过容器中的TcpInboundGateway.start() 完成。另外,我认为您的.handle(System.out::println); 在那里很糟糕。因为TcpInboundGateway 无论如何都应该收到回复。否则 ti 会卡住。但是,是的..这不是Dispatcher has no subscribers 的根源...我会在本地玩类似的东西,过一段时间再回来找你。
  • 请在我的回答中查看EDIT 2
  • 非常感谢您的帮助。我设法使用您在此处的回复和 github 测试用例来解决它。
【解决方案2】:

这是工作的简化解决方案:

Beans.java

package beanconfig;

import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;

@Configuration
@EnableIntegration
public class Beans {
    //Beans can be configured here
}

IntegrationTest.java

import org.junit.Test;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer;
import org.springframework.integration.transformer.ObjectToStringTransformer;
import org.springframework.messaging.MessageChannel;

public class IntegrationTest {
    private String generateComponentName(String baseName, int instanceCount) {
        return baseName + "_" + instanceCount;
    }

    @Test
    public void integrationTest1() throws Exception {
        try(AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext()) {
            ctx.scan("beanconfig");
            ctx.refresh();

            ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();

            int numberOfClients = 2; //TODO configure from file

            for (int count = 0; count < numberOfClients; count++) {
                //connection factory
                TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6680);
                connectionFactory.setSingleUse(false);
                connectionFactory.setSoTimeout(10000);
                connectionFactory.setSerializer(new ByteArrayLfSerializer());
                connectionFactory.setDeserializer(new ByteArrayLfSerializer());

                //inbound gateway
                TcpInboundGateway inboundGateway = new TcpInboundGateway();
                inboundGateway.setRequestChannel(new DirectChannel());
                inboundGateway.setConnectionFactory(connectionFactory);
                inboundGateway.setClientMode(true);
                inboundGateway.setReplyTimeout(10000);
                inboundGateway.setRetryInterval(20000);

                //message transformation and flow
                String flowName = generateComponentName("flow", count);
                IntegrationFlow flow = IntegrationFlows.from(inboundGateway)
                    .transform(new ObjectToStringTransformer())
                    .handle(h -> System.out.println("Message received: " + h.getPayload()))
                    .get();
                beanFactory.registerSingleton(flowName, flow);
                beanFactory.initializeBean(flow, flowName);
            }

            ctx.start();

            //TODO do proper validation here
            Thread.sleep(10000);
        }
    }
}

基本上,我最初的尝试存在一些问题。这是我为使其工作所做的更改:

1) 创建 AnnotationConfigApplicationContext 时,必须使用一个配置类作为参数创建它,并使用 @EnableIntegration 注解进行标记。如果不是,则必须由包含此注释的上下文扫描组件。我在第一次尝试时确实这样做了,但调用刷新为时已晚,应该在 ctx.scan 之后直接调用。因为我的 ctx.refresh() 是在我的 beanfactory 注册之后,所以在创建集成 bean 时实际上没有设置 @EnableIntegration。将 ctx.refresh() 直接移到 ctx.scan() 下方即可解决问题。

2) 每个注册到上下文中的 bean 也必须由 beanfactory 初始化。这是为了确保 BeanPostProcessors 运行(这不是由 registerSingleton 自动完成的)。

3) 然后需要调用 ctx.start() 来启用在 ctx.refresh() 之后创建的 bean。

【讨论】:

    猜你喜欢
    • 2018-12-14
    • 2021-08-17
    • 1970-01-01
    • 2012-12-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-17
    • 2011-08-22
    相关资源
    最近更新 更多