【问题标题】:Why am I only able to read/processs one file from an SI MessageSource?为什么我只能从 SI MessageSource 读取/处理一个文件?
【发布时间】:2015-12-08 07:59:49
【问题描述】:

看看这个流程...

public Date nextExecutionTime(TriggerContext triggerContext) {
    return this.invoked.getAndSet(true) ? null : new Date();
}

@Bean
public IntegrationFlow mainFlow() {
    JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .channel(LoggingUtils.createLoggingMessageChannel("File:::"))
            .transform(new FileToInputStreamTransformer())
            .split(new FileSplitter(), null)
            .channel(c -> c.executor(Executors.newFixedThreadPool(10)))
            .transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
            .handle(LoggingUtils.createLoggingMessageHandler("Parsed JSON record #"))
            //.handle(jdbcRepositoryHandler())
            //.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
            .get();
    // @formatter:on
}    

为什么我只能读取一个文件?

即使配置的MessageSourceAmazonS3InboundSynchronizationMessageSource)将多个文件写入本地目录。

控制台输出示例

2015-09-11 09:52:59,856 [task-scheduler-1] org.springframework.integration.aws.s3.InboundFileSynchronizationImpl INFO 同步完成 2015-09-11 09:52:59,860 [task-scheduler-1] org.springframework.integration.handler.LoggingHandler INFO 事件:[File:::] - 消息:[GenericMessage [payload=/Users/cphi/development/项目/expedia/git/luis-data-migration-service/target/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,标题={id=e58c332b -c217-8059-c4e8-09bba2c430a0,时间戳=1441990379859}]] 2015-09-11 09:52:59,918 [pool-2-thread-8] org.springframework.integration.handler.LoggingHandler INFO 事件:[解析的 JSON 记录#] - 消息:[GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=, ratePlanId=201744463,stayDate=2015 年 9 月 2 日星期三 17:00:00 PDT,ratePlanLevel=0,hotelId=4469515,rprLogSeqNum=16,logActionTypeId=2,sellStateId=1,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged =,supplierUpdateDate=2015 年 8 月 22 日星期六 07:57:24 PDT,supplierUpdateTuid=68630676,createDate=2015 年 8 月 22 日星期六 07:57:24 PDT,changeRequestId=31461011173,changeRequestSourceId=], headers={sequenceNumber=8, file_name=2015 -08-22-23-58-0.302402118982895.gz,sequenceSize=0,correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510,file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-服务/目标/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,id=6b866d25-07e8-22a4-381c-d26205393f3b,时间戳=1441990379898}] ] 2015-09-11 09:52:59,919 [pool-2-thread-3] org.springframework.integration.handler.LoggingHandler INFO 事件:[解析的 JSON 记录#] - 消息:[GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=, ratePlanId=1030513,stayDate=2015 年 8 月 26 日星期三 17:00:00 PDT,ratePlanLevel=0,hotelId=1615126,rprLogSeqNum=6,logActionTypeId=2,sellStateId=0,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged =,supplierUpdateDate=2015 年 8 月 22 日星期六 07:57:35 PDT,supplierUpdateTuid=46712703,createDate=2015 年 8 月 22 日星期六 07:57:35 PDT,changeRequestId=31461014045,changeRequestSourceId=], headers={sequenceNumber=3, file_name=2015 -08-22-23-58-0.302402118982895.gz,sequenceSize=0,correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510,file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-服务/目标/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,id=ddf1ee98-55c4-81de-af77-a886a340fe07,时间戳=1441990379897}] 2015-09-11 09:52:59,919 [pool-2-thread-2] org.springframework.integration.handler.LoggingHandler INFO 事件:[解析的 JSON 记录#] - 消息:[GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=, ratePlanId=163007,stayDate=Fri Dec 11 16:00:00 PST 2015,ratePlanLevel=0,hotelId=897973,rprLogSeqNum=3,logActionTypeId=2,sellStateId=0,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged =,supplierUpdateDate=2015 年 8 月 22 日星期六 07:57:16 PDT,supplierUpdateTuid=46712703,createDate=2015 年 8 月 22 日星期六 07:57:16 PDT,changeRequestId=31461009374,changeRequestSourceId=], headers={sequenceNumber=2, file_name=2015 -08-22-23-58-0.302402118982895.gz,sequenceSize=0,correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510,file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-服务/目标/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,id=d7d7a418-6593-bc57-fc7d-e181778be0c8,时间戳=1441990379899}]] ...

目录内容

.../target/s3-dump/RatePlanLevelRestriction
+- 2015
+-- 08
+--- 23
+---- 00
+----- 2015-08-22-23-58-0.302402118982895.gz
+----- 2015-08-22-23-58-0.302992661055088.gz
+----- 2015-08-22-23-58-0.303107496339691.gz                                                                                                                               

如果您对此感到好奇,以下是要点:

【问题讨论】:

    标签: java spring spring-integration dsl


    【解决方案1】:

    在轮询器上增加maxMessagesPerPoll(默认为 1)。

    【讨论】:

    • 如果我这样做;我怎么知道最大消息应该是多少?每个同步目录可以包含不同数量的文件。我猜您是在限制一次投票中可以处理的文件数量?我试过了,它确实处理了超过 1 个文件,但如果我将 maxMessagesPerPoll 设置得太低,我仍在尝试调和,我仍然会遇到原来的问题吗?我还尝试了p.trigger(Pollers.fixedDelay(2000),发现轮询了相同的文件,但它们没有在下游 b/c 的方式下重新处理 InboundFileSynchronizer 在 S3 MessageSource impl 中的工作方式。
    • 您只需将mmpp 设置为足够大(或-1 为无限制 - 意味着继续处理文件直到没有遇到新文件)。
    • 谢谢加里!但是-1不应该是默认的吗?
    • 请参阅 the important note in the documentation,它解释了为什么入站通道适配器默认为 1 - 框架无法对特定 MessageSource 的行为方式做出任何假设,因此为了安全起见默认为 1。我们可以考虑为MessageSource 添加一些属性,但许多用户实际上希望加快文件处理速度,而mmpp 已经让您完全控制。
    猜你喜欢
    • 1970-01-01
    • 2012-05-18
    • 2014-11-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-02
    • 1970-01-01
    • 2014-11-30
    相关资源
    最近更新 更多