【问题标题】:How to convert from spring-integration-kafka 1.0.0M (XML config) to spring-integration-kafka 1.2.1 (Java config)?如何从 spring-integration-kafka 1.0.0M(XML 配置)转换为 spring-integration-kafka 1.2.1(Java 配置)?
【发布时间】:2015-12-12 07:49:18
【问题描述】:

我去年使用 XML 配置将 spring-integration-kafka 1.0.0M 实现到一个 Spring MVC 项目中,并且非常简单。由于 Spring 似乎正在朝着 Java 配置方向发展(而不是 XML),我想从使用 spring-integration-kafka XML 配置转换为 Java 配置,最新版本的 spring-integration-kafka (1.2. 1) 支持。问题是在网上并没有太多完整的例子,而且我发现的例子看起来已经过时了。我的配置很简单:

<bean id="kafkaStringEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder" />

<bean id="customObjectMapper" class="ad.content.api.utils.ObjectMapperFactory" factory-method="getMapper" />

<int:channel id="kafkaConversionRequest" />

<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="message.send.max.retries">${kafka.retries}</prop>
        </props>
    </property>
</bean>

<int-kafka:producer-context id="kafkaWidgetProducerContext" producer-properties="producerProperties">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration
            broker-list="${kafka.broker}" key-class-type="java.lang.String"
            key-encoder="kafkaStringEncoder" value-class-type="java.lang.String"
            value-encoder="kafkaStringEncoder" topic="widget-.*"
            compression-codec="default" async="true" />
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

<!-- declare spring integration gateway for kafka -->
<int:gateway service-interface="ad.content.api.models.kafka.KafkaGateway" default-reply-timeout="2000">
    <int:method name="publishConversion" request-channel="kafkaConversionRequest" />
</int:gateway>

<int:chain input-channel="kafkaConversionRequest" output-channel="kafkaToJson">
    <int:header-enricher>
        <int:header name="topic" value="widget-conversion" />
    </int:header-enricher>
</int:chain>

<int:object-to-json-transformer input-channel="kafkaToJson" output-channel="kafkaOutbound" object-mapper="customObjectMapper" />

<int-kafka:outbound-channel-adapter id="kafkaOutbound" kafka-producer-context-ref="kafkaWidgetProducerContext" />

到目前为止,我能弄清楚的是:

// gateway
@MessagingGateway(defaultReplyTimeout="2000")
public interface KafkaGateway {
    @Gateway(requestChannel="kafkaConversionRequest", headers=@GatewayHeader(name="topic", value="widget-conversion"))
    void publishConversion(Conversion conversion);
}

// create channel
@Bean(name="kafkaConversionRequest")
public MessageChannel getConversionRequest() {
    return new DirectChannel();
}

@Bean
public KafkaProducerMessageHandler getHandler() throws Exception {
    return new KafkaProducerMessageHandler(getContext());
}

@Bean
public KafkaProducerContext getContext() throws Exception {
    KafkaProducerContext context = new KafkaProducerContext();
    context.setProducerConfigurations(Collections.singletonMap("config", getConfiguration()));
    return context;
}

@Bean 
public ProducerConfiguration<String, String> getConfiguration() throws Exception {
    return new ProducerConfiguration<String, String>(getMetaData(), getProducer());
}

@Bean
@Transformer(inputChannel="kafkaToJson", outputChannel="kafkaOutbound")
public ObjectToJsonTransformer getJsonTransformer() {
    return new ObjectToJsonTransformer();
}

@Bean
public ProducerMetadata<String, String> getMetaData() {
    StringSerializer serializer = new StringSerializer();
    return new ProducerMetadata<String, String>("widget-.*", String.class, String.class, serializer, serializer);
}

@Bean
public Producer<String, String> getProducer() throws Exception {
    return new ProducerFactoryBean<String, String>(getMetaData(), "dev.kafka-broker01:9092").getObject();
}

【问题讨论】:

    标签: spring-mvc spring-integration apache-kafka


    【解决方案1】:

    有一个出色的 pull request 可能对您有所帮助的 kafka 示例。

    【讨论】:

      【解决方案2】:

      这是一个使用 Spring Java Config 和等效 XML 版本配置 Kafka 的完整功能示例:https://spring.io/blog/2015/04/15/using-apache-kafka-for-integration-and-data-processing-pipelines-with-spring

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-07-30
        • 1970-01-01
        • 1970-01-01
        • 2021-03-20
        • 2020-12-19
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多