【问题标题】:Spring integration DSL creating JMS MessageDriver Channel Adapter in java 1.7Spring集成DSL在java 1.7中创建JMS MessageDriver Channel Adapter
【发布时间】:2015-08-17 22:19:23
【问题描述】:

我正在尝试为 JMS MessageDriverChannelAdapter 创建一个集成流,我需要通过它向 Kafka 服务器发送消息。但我真的 当我尝试将 xml 标签转换为 dsl 特定代码时卡住了,无法将 xml 转换为所需的 DSL。任何人都可以请提供 任何指向它的指针,因为我无法在此处继续。

我已经创建了一个像这样的 MessageListenerContainer ......

String brokerUrl = "tcp://101.11.102.125:31316";
String topic = "sometpoic"; 
String kafkaBrokerUrl = "101.11.102.125:1012";
String kafkaTopic = "kafka_Topic";

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory();
    ActiveMQTopic mqTopic = new ActiveMQTopic(topic);           
    conFactory.setBrokerURL(brokerUrl);
    container.setConnectionFactory(conFactory);
    container.setDestination(mqTopic);
    container.setSessionTransacted(true);
    return container;
}

这些是我的输入和输出通道............

@Bean
public MessageChannel jmsInChannel() {
     return MessageChannels.publishSubscribe().get();
}

@Bean
public MessageChannel jmsOutChannel() {
     return MessageChannels.publishSubscribe().get();
}

And this is my JMS adapter flow............

@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel())                                              
                   .get();
}

现在我需要像这样创建一个 header-enricher,但不能将其转换为 DSL。

<int:header-enricher input-channel="jmsInChannel" output-channel="jmsOutChannel">
<int:header name="kafkaBrokerUrl" value="${kafka.url}"></int:header>
<int:header name="kafkaTopic" value="${kafka.topic}"></int:header>

我需要创建一个服务激活器并调用一个 kafka 生产者方法,形成一个不同的类,例如 xml....

<int:service-activator input-channel="jmsOutChannel" ref="KafkaProducer" method="produceToJmsKafka"/>
<bean id="KafkaProducer" class="com.david.jms.JmsKafkaProducer"/>

那么如何将上述这些xml代码转换为类似的DSL特定代码。

  After getting the compilation error I have tried like this...
 @SuppressWarnings("unchecked")
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel())
                   .enrichHeaders(new MapBuilder()
                                .put("brokerid", brokerid)
                                .put("topic", topic)
                                .put("source", source)
                                .put("fileType", fileType))
                   .handle("KafkaProducer", "produceToJmsKafka")
                   .get();
}

@Bean
public JmsProducer KafkaProducer() {
    return new JmsProducer();
}   

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    可能是这样的:

    @Value("${kafka.url}")
    private String kafkaBrokerUrl;
    
    @Value("${kafka.topic}")
    private String kafkaTopic;
    
    ....
    @Bean
    public IntegrationFlow jmsMessageDrivenFlow() {
        return IntegrationFlows
                .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                       .autoStartup(true))                     
                       .channel(jmsInChannel()) 
                       .enrichHeaders(new StringStringMapBuilder()
                                                .put("kafkaBrokerUrl", kafkaBrokerUrl)
                                                .put("kafkaTopic", kafkaTopic))
                       .handle("KafkaProducer", "produceToJmsKafka")
                       .get();
    }
    

    从这里我看不出有理由拥有那些 MessageChannel bean,尤其是像 publishSubscribe()

    DSL 1.1 开始,我们提供了 Spring Integration Kafka Adapters 的实现。

    【讨论】:

    • 嗨,Artem,非常感谢您的回答,spring DSL 很棒。但我对此有些怀疑。在实现 JMS 集成流程时,enrichHeaders() 面临编译错误,提示“IntegrationFlowDefinition 类型中的方法enrichHeaders(MapBuilder,String,Object>) 不适用于参数(StringStringMapBuilder)”。当我转换为 Map) 时,运行时出现错误。同样在 JMS 集成流程中 jmsOutChannel() 确实不是必需的,对此不确定。因此,请提供更多关于此的指针。
    • 嗨 Artem,在收到编译错误后,我尝试过这样的操作。在我的原始帖子中编辑了代码。不确定集成流程。那么我可以这样做吗。您能否提供任何关于此的指针。
    • 我会说一切对我来说都很好。当然,.autoStartup(true) 中没有任何理由,而且我不确定在.from().enrichHeaders() 之间是否需要.channel(jmsInChannel()),但我确信这段代码应该可以工作。
    • 您好 Artem,我的问题是,我可以使用 MapBuilder() 而不是 StringStringMapBuilder(),因为 StringStringMapBuilder() 最终会出现编译错误。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-01-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-26
    相关资源
    最近更新 更多