【发布时间】:2015-08-17 22:19:23
【问题描述】:
我正在尝试为 JMS MessageDriverChannelAdapter 创建一个集成流,我需要通过它向 Kafka 服务器发送消息。但我真的 当我尝试将 xml 标签转换为 dsl 特定代码时卡住了,无法将 xml 转换为所需的 DSL。任何人都可以请提供 任何指向它的指针,因为我无法在此处继续。
我已经创建了一个像这样的 MessageListenerContainer ......
String brokerUrl = "tcp://101.11.102.125:31316";
String topic = "sometpoic";
String kafkaBrokerUrl = "101.11.102.125:1012";
String kafkaTopic = "kafka_Topic";
@Bean
public DefaultMessageListenerContainer listenerContainer() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory();
ActiveMQTopic mqTopic = new ActiveMQTopic(topic);
conFactory.setBrokerURL(brokerUrl);
container.setConnectionFactory(conFactory);
container.setDestination(mqTopic);
container.setSessionTransacted(true);
return container;
}
这些是我的输入和输出通道............
@Bean
public MessageChannel jmsInChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public MessageChannel jmsOutChannel() {
return MessageChannels.publishSubscribe().get();
}
And this is my JMS adapter flow............
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDriverChannelAdapter(listenerContainer())
.autoStartup(true))
.channel(jmsInChannel())
.get();
}
现在我需要像这样创建一个 header-enricher,但不能将其转换为 DSL。
<int:header-enricher input-channel="jmsInChannel" output-channel="jmsOutChannel">
<int:header name="kafkaBrokerUrl" value="${kafka.url}"></int:header>
<int:header name="kafkaTopic" value="${kafka.topic}"></int:header>
我需要创建一个服务激活器并调用一个 kafka 生产者方法,形成一个不同的类,例如 xml....
<int:service-activator input-channel="jmsOutChannel" ref="KafkaProducer" method="produceToJmsKafka"/>
<bean id="KafkaProducer" class="com.david.jms.JmsKafkaProducer"/>
那么如何将上述这些xml代码转换为类似的DSL特定代码。
After getting the compilation error I have tried like this...
@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDriverChannelAdapter(listenerContainer())
.autoStartup(true))
.channel(jmsInChannel())
.enrichHeaders(new MapBuilder()
.put("brokerid", brokerid)
.put("topic", topic)
.put("source", source)
.put("fileType", fileType))
.handle("KafkaProducer", "produceToJmsKafka")
.get();
}
@Bean
public JmsProducer KafkaProducer() {
return new JmsProducer();
}
【问题讨论】: