【问题标题】:JMS listener with netty TCP使用 netty TCP 的 JMS 侦听器
【发布时间】:2021-07-20 19:28:14
【问题描述】:

我正在尝试使用 TCP 开发 Netty。我正在使用 IBM MQ 客户端连接到 MQ 代理,我的想法是我需要开发一个接收消息的 TCP 服务器,将其传递给 MQ,如果服务器响应,则将其发送给发送请求的客户端。因此,我需要为异步消息实现 JMS 侦听器。问题是 JMS 侦听器位于 Netty 通道之外,我试图弄清楚如何读取消息将其添加到 Netty 通道并立即将其发送到连接到 TCP 套接字的客户端。我可以完美地发送消息。问题是服务器何时响应。我收到消息,从clientConnectionProvider 和我writeAndFlush 获取上下文/通道,但我没有看到消息到达客户端。

我在主类中创建监听器。

public class Main {

    private final Integer port;

    private final Destination sendDestination;
    private final JMSContext jmsSendContext;

    private final JMSConsumer consumer;
    private final JMSContext jmsRecieveContext;
    private final Destination consumerDestination;

    private final ClientConnectionProvider clientConnectionProvider;

    public Main(Properties properties)
            throws JMSException {
        
            if (properties.containsKey(ConfigurationEnum.SERVER_PORT) {
                this.port = properties.getProperty(ConfigurationEnum.SERVER_PORT)
            } else {
                log.error("server.port not defined in properties"
                throw new ConfigException(
                        String.format("server.port not defined in properties");
            }

        JmsFactoryFactory ff = JmsFactoryFactory.getInstance(JmsConstants.WMQ_PROVIDER);
        JmsConnectionFactory cf = ff.createConnectionFactory();

        // Set the properties
        cf.setStringProperty(CommonConstants.WMQ_HOST_NAME,
                properties.getProperty(ConfigurationEnum.IBM_MQ_HOST.getValue()));
        cf.setIntProperty(CommonConstants.WMQ_PORT,
                Integer.parseInt(properties.getProperty(ConfigurationEnum.IBM_MQ_PORT.getValue())));
        cf.setStringProperty(CommonConstants.WMQ_CHANNEL,
                properties.getProperty(ConfigurationEnum.IBM_MQ_CHANNEL.getValue()));
        cf.setIntProperty(CommonConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        cf.setStringProperty(CommonConstants.WMQ_QUEUE_MANAGER,
                properties.getProperty(ConfigurationEnum.IBM_QUEUE_MANAGER.getValue()));
        cf.setStringProperty(CommonConstants.WMQ_APPLICATIONNAME, "FIX Orchestra Gateway");
        cf.setBooleanProperty(JmsConstants.USER_AUTHENTICATION_MQCSP, true);
        cf.setStringProperty(JmsConstants.USERID, properties.getProperty(ConfigurationEnum.IBM_APP_USER.getValue()));
        cf.setStringProperty(JmsConstants.PASSWORD, properties.getProperty(ConfigurationEnum.IBM_APP_PASS.getValue()));

        clientConnectionProvider = new ClientConnectionProvider();
        
        jmsRecieveContext = cf.createContext();
        consumerDestination = jmsRecieveContext
                .createQueue(properties.getProperty(ConfigurationEnum.IBM_QUEUE_CONSUMER.getValue()));
        consumer = jmsRecieveContext.createConsumer(consumerDestination);
        consumer.setMessageListener(new JMSMessageListener(clientConnectionProvider));
        jmsRecieveContext.start();

        jmsSendContext = cf.createContext();
        sendDestination = jmsSendContext
                .createQueue(properties.getProperty(ConfigurationEnum.IBM_QUEUE_TRANSACTION.getValue()));

    }

public void start() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(10);

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new DefaultChannelInitializer());

            // Start the server.
            ChannelFuture f = serverBootstrap.bind(port).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            jmsRecieveContext.stop();
            jmsRecieveContext.close();
            jmsSendContext.close();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();

        try (InputStream inputStream = new FileInputStream(args[0])) {
            properties.load(inputStream);

            new Main(properties).start();

        } catch (FileNotFoundException e) {
            log.error("Properties file specified in path {} was not found.", args[0], e);
        } catch (IOException e) {
            log.error("There was an IO error.", e);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ConfigException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

监听器是一个简单的类。

@AllArgsConstructor
public class JMSMessageListener implements MessageListener {

    private final ClientConnectionProvider clientConnectionProvider;

    @Override
    public void onMessage(Message message) {

        try {
            String messageString = message.getBody(String.class);

            if (clientConnectionProvider.contains(ClientID.get(messageString))) {
                ClientConnection cc = clientConnectionProvider.getConnection(ClientID.get(messageString));
                if (cc.getCtx() == null) {
                    // TODO: Need to save message when client reconects
                } else {
                    cc.getCtx().channel().write(messageString);
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

【问题讨论】:

  • 我在 docker 中作为 JMS 代理连接到 IBM MQ
  • 我正在使用 MQ 客户端连接到 MQ 代理,我的想法是我需要开发一个接收消息的 TCP 服务器将其传递给 MQ,如果服务器响应,则将其发送给客户端发送请求。

标签: tcp jms listener netty


【解决方案1】:

您应该调用writeAndFlush(...) 并将ChannelFutureListener 附加到返回给它的ChannelFuture。在侦听器中,您可以检查写入是否成功或失败(如果是则打印异常)。在您当前的代码中,您只需调用 write(...),它只会将消息放入 Channel 的出站缓冲区中,但实际上并未将其刷新到套接字。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-29
    • 2011-01-20
    • 1970-01-01
    相关资源
    最近更新 更多