【问题标题】:how to resolve the jms outbound destination queue at runtime in spring integration,spring集成中如何在运行时解析jms出站目标队列,
【发布时间】:2020-04-15 16:40:23
【问题描述】:

所以这里,目标队列由源目录中的 .txt 文件中的 MSG_TYPE 确定,该文件是集成流的源。

如果 MSG_TYPE 以“ORU”开头发送到队列“jms/dataqueue”,如果 MSG_TYPE 是“MHS”发送到“jms/headersQueue”类似的东西。我在这里使用 wls JmsTemplate。那么我如何根据从文本文件中获取的 Msgtype 来解析目标。 请参阅方法 sendToJmsQueue,我希望在运行时确定 inboundDataQueue

请看下面的源代码....

@EnableJms
@Configuration
@ComponentScan
@EnableAutoConfiguration
public class IntegrationConfig {

     private final Logger LOGGER = LoggerFactory.getLogger(IntegrationConfig.class);

    private  QueueConnectionFactory factory;
    private  InitialContext jndi;
    //private QueueSession session = null;

    @Value("${spring.wls.InboundDataQueue}")
    private String inboundDataQueue;

    @Value("${spring.wls.InboundSolicitedQueue}")
    private String inboundSolicitedQueue;

    @Value("${spring.wls.InboundQueryQueue}")
    private String inboundQueryQueue;

    @Value("${spring.wls.InboundAckQueue}")
    private String inboundAckQueue;

    @Autowired
    public FileProcessor fileProcessor;

    @Autowired
    public OutboundGatewayConfig outboundGatewayConfig;


    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel jmsOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel customErrorChannel() {
        return new DirectChannel();
    }

    @Bean
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }


    @Bean
    public JmsTemplate getJmsTemplate(@Value("${spring.wls.jndiContext}") String context,
            @Value("${spring.wls.InboundServerURL}") String serverName,
            @Value("${spring.wls.inboundConnfactory}") String factoryName) {

        if (jndi == null) {
            // Create a context
            Hashtable data = new Hashtable();
            data.put(Context.INITIAL_CONTEXT_FACTORY, context);
            data.put(Context.PROVIDER_URL, serverName);
            try {
                jndi = new InitialContext(data);
            } catch (NamingException e) {
                LOGGER.error("Failed to initialize JNDI context:: ", e);
            }
        }
        if (factory == null) {
            // Look up JMS connection factory
            /*
             * factory = (QueueConnectionFactory)PortableRemoteObject.narrow( jndi.lookup(
             * factoryName ), QueueConnectionFactory.class );
             */
            try {
                factory = (QueueConnectionFactory) (jndi.lookup(factoryName));
            } catch (NamingException e) {
                LOGGER.error("Error in Creating Weblogic JNDI Connection factory:: ", e);
                //throw new BeanCreationException("JmsTemplate", "Failed to create a JmsTemplate", e);

            }
        }
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
        JmsTemplate wlsJmsTemplate = new JmsTemplate();
        wlsJmsTemplate.setConnectionFactory(cachingConnectionFactory);
        return wlsJmsTemplate;

    }

    @Bean
    public IntegrationFlow processFile() {
        return IntegrationFlows
                    .from("fileInputChannel")           
                    .transform(fileToStringTransformer())
                    .handle("fileProcessor", "process")
                    .log(LoggingHandler.Level.INFO, "process file", m -> m.getHeaders().get("Message_Type"))
                    .channel(this.jmsOutboundChannel())
                    .get();

    }

    @Bean
    public IntegrationFlow sendToJmsQueue(JmsTemplate wlsJmsTemplate) {
        return IntegrationFlows.from(this.jmsOutboundChannel())
                .log(LoggingHandler.Level.INFO, "sending to queue", m -> 
                                    m.getHeaders().get("Message_Type"))
                //.channel(this.jmsOutboundChannel())
                // want inboundDataQueue to be determined at runtime
                .handle(Jms.outboundAdapter(wlsJmsTemplate).destination(inboundDataQueue), e -> e.advice(expressionAdvice()))
                // .handleWithAdapter(adapters ->
                // adapters.jms(wlsJmsTemplate).destination("outQueue"))
                .get();

    }


    /**
     * @param path
     * @param fileExt
     * @return watch for files created in configured directory and load then send to
     *         processing
     */
    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource(@Value("${file.poller.path}") final String path,
            @Value("${file.poller.fileName-pattern}") final String fileExt) {
        CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
        filters.addFilter(new SimplePatternFileListFilter(fileExt));
        // filters.addFilter(new AcceptOnceFileListFilter<File>());

        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setAutoCreateDirectory(false);
        source.setDirectory(new File(path));
        source.setFilter(filters);
        source.setUseWatchService(true);
        source.setWatchEvents(WatchEventType.CREATE);
        System.out.println(path);
        return source;
    }

    /**
     * this method will evaluate the further action on successful or failure delivery to JMS Queue.
     * SpEL will rename the the file in Source Directory accordingly
     */
    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("headers['file_originalFile'].renameTo(new java.io.File(headers['file_originalFile'].absolutePath + '.done'))");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString("headers['file_originalFile'].renameTo(new java.io.File(headers['file_originalFile'].absolutePath + '.failed.to.send'))");
        advice.setTrapException(true);
        return advice;
    }
}

【问题讨论】:

    标签: spring-integration spring-jms spring-integration-dsl


    【解决方案1】:

    有关重载的destination 方法,请参阅 DSL javadocs:

    /**
     * Configure a {@link Function} that will be invoked at run time to determine the destination to
     * which a message will be sent. Typically used with a Java 8 Lambda expression:
     * <pre class="code">
     * {@code
     * .<Foo>destination(m -> m.getPayload().getState())
     * }
     * </pre>
     * @param destinationFunction the destination function.
     * @param <P> the expected payload type.
     * @return the current {@link JmsOutboundChannelAdapterSpec}.
     * @see JmsSendingMessageHandler#setDestinationName(String)
     * @see FunctionExpression
     */
    public <P> S destination(Function<Message<P>, ?> destinationFunction) {
        this.target.setDestinationExpression(new FunctionExpression<>(destinationFunction));
        return _this();
    }
    

    所以

    .destination(msg -> "jms/" + msg.getHeaders().get("Message_Type", String.class))
    

    编辑

    所以只需更改函数:

    .destination(msg -> {
         String type = msg.getHeaders().get("MSG_TYPE", String.class);
         if (type.startsWith("ORU") {
             return "jms/dataqueue";
         }
         else ...
    })
    

    【讨论】:

    • 对不起,我错误地提出了这个问题,我现在已经编辑了这个问题。实际上这就是我需要的,如果 MSG_TYPE 以“ORU”开头发送到队列“jms/dataqueue”,如果 MSG_TYPE 是“MHS”发送到“jms/headersQueue”
    • 所以只需更改函数以使用该逻辑 - 请参阅编辑。
    猜你喜欢
    • 2018-12-25
    • 2015-04-12
    • 1970-01-01
    • 1970-01-01
    • 2011-09-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-17
    相关资源
    最近更新 更多