【问题标题】:Multiple message processed处理多条消息
【发布时间】:2016-01-20 15:24:56
【问题描述】:

我有一个 spring xd 源模块,它从 s3 中提取文件并逐行拆分。我的 spring 配置如下。但是我有 3 个容器和 1 个管理服务器。现在我看到每个容器正在处理重复的消息他们每个人都在那里下载自己的副本。 我可以通过将源 s3 模块部署计数为 1 来解决,但我对消息的处理速度变慢了。有什么办法可以解决这个问题吗?

   <int:poller fixed-delay="${fixedDelay}" default="true">
    <int:advice-chain>
            <ref bean="pollAdvise"/>

            </int:advice-chain>
    </int:poller>


    <bean id="pollAdvise" 

    </bean>






    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
        <property name="accessKey" value="#{encryptedDatum.decryptBase64Encoded('${accessKey}')}"/>
        <property name="secretKey" value="${secretKey}"/>
    </bean>


    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
        <property name="proxyHost" value="${proxyHost}"/>
        <property name="proxyPort" value="${proxyPort}"/>
    <property name="preemptiveBasicProxyAuth" value="false"/> 
    </bean>

    <bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
        <constructor-arg index="0" ref="credentials"/>
        <constructor-arg index="1" ref="clientConfiguration"/>
        <property name="awsEndpoint" value="s3.amazonaws.com"/>
        <property name="temporaryDirectory" value="${temporaryDirectory}"/>
        <property name="awsSecurityKey"  value="${awsSecurityKey}"/>
    </bean>

    <bean id="encryptedDatum" class="abc"/>

    <!-- aws-endpoint="https://s3.amazonaws.com"  -->
    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
                                        bucket="${bucket}"
                                        s3-operations="s3Operations"
                                        credentials-ref="credentials"
                                        file-name-wildcard="${fileNameWildcard}"
                                        remote-directory="${remoteDirectory}"
                                        channel="splitChannel"
                                        local-directory="${localDirectory}"
                                        accept-sub-folders="false"
                                        delete-source-files="true"
                                        archive-bucket="${archiveBucket}"
                                        archive-directory="${archiveDirectory}">
    </int-aws:s3-inbound-channel-adapter>

  <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">

        <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpression" value="payload.delete()"/>
            </bean>
        </int-file:request-handler-advice-chain>

</int-file:splitter>

    <int:channel id="output"/>

[更新] 我按照您的建议使用元数据存储添加了幂等性。但是由于我的 xd 在 3 个容器集群中与 rabbit 一起运行,所以简单的元数据存储可以工作吗?我想我应该使用 reds/mongo 元数据源。如果我使用 mongo/redis 元数据存储,我该怎么做驱逐/删除消息,因为消息会随着时间的推移而堆积?

<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
                              metadata-store="store"
                             discard-channel="nullChannel"
                             throw-exception-on-rejection="false"
                              key-expression="payload"/>

    <bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

【问题讨论】:

    标签: spring-integration spring-xd


    【解决方案1】:

    我可以建议你看看Idempotent Receiver

    您可以使用共享的MetadataStore 并且不接受重复文件。

    应该为您的&lt;int-file:splitter&gt; 配置&lt;idempotent-receiver&gt;。是的:使用丢弃逻辑来避免重复消息。

    更新

    。但是由于我的 xd 运行在带有 rabbit 的 3 个容器集群中,简单的元数据存储是否可以工作?

    这没关系,因为您从 S3 MessageSource 启动流,因此您应该过滤已经存在的文件。因此,您需要外部共享MetadataStore

    .如果我使用 mongo/redis 元数据存储,我如何驱逐/删除消息,因为消息会随着时间的推移而堆积?

    没错。这是幂等接收器逻辑的副作用。如果您使用数据库,不知道这对您有什么问题...

    您可以通过一些定期任务来清理集合/密钥。也许每周一次......

    【讨论】:

    • 更新了您建议的我的实施问题。请让我知道您的意见..
    • “这没关系,因为您从 S3 消息源启动流,所以您应该过滤已经存在的文件。因此您需要外部共享元数据存储。”在这种情况下,简单元数据存储将无法工作,因为它的每个实例?可以跨实例共享简单的元数据存储吗
    • ???我不知道如何跨实例共享简单的内存存储。对,你可以看看 Hazelcast DistributedMapSimpleMetadataStore,但你最终会遇到同样的堆积问题。
    • 是的,你是对的,我有点困惑,我们需要一些共享缓存。感谢您的见解!它有帮助
    猜你喜欢
    • 2018-01-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-09
    • 2014-12-27
    • 1970-01-01
    • 2019-06-21
    • 2014-10-23
    相关资源
    最近更新 更多