【发布时间】:2015-12-08 07:59:49
【问题描述】:
看看这个流程...
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Bean
public IntegrationFlow mainFlow() {
JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.channel(LoggingUtils.createLoggingMessageChannel("File:::"))
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter(), null)
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
.transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
.handle(LoggingUtils.createLoggingMessageHandler("Parsed JSON record #"))
//.handle(jdbcRepositoryHandler())
//.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
为什么我只能读取一个文件?
即使配置的MessageSource(AmazonS3InboundSynchronizationMessageSource)将多个文件写入本地目录。
控制台输出示例
2015-09-11 09:52:59,856 [task-scheduler-1] org.springframework.integration.aws.s3.InboundFileSynchronizationImpl INFO 同步完成 2015-09-11 09:52:59,860 [task-scheduler-1] org.springframework.integration.handler.LoggingHandler INFO 事件:[File:::] - 消息:[GenericMessage [payload=/Users/cphi/development/项目/expedia/git/luis-data-migration-service/target/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,标题={id=e58c332b -c217-8059-c4e8-09bba2c430a0,时间戳=1441990379859}]] 2015-09-11 09:52:59,918 [pool-2-thread-8] org.springframework.integration.handler.LoggingHandler INFO 事件:[解析的 JSON 记录#] - 消息:[GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=, ratePlanId=201744463,stayDate=2015 年 9 月 2 日星期三 17:00:00 PDT,ratePlanLevel=0,hotelId=4469515,rprLogSeqNum=16,logActionTypeId=2,sellStateId=1,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged =,supplierUpdateDate=2015 年 8 月 22 日星期六 07:57:24 PDT,supplierUpdateTuid=68630676,createDate=2015 年 8 月 22 日星期六 07:57:24 PDT,changeRequestId=31461011173,changeRequestSourceId=], headers={sequenceNumber=8, file_name=2015 -08-22-23-58-0.302402118982895.gz,sequenceSize=0,correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510,file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-服务/目标/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,id=6b866d25-07e8-22a4-381c-d26205393f3b,时间戳=1441990379898}] ] 2015-09-11 09:52:59,919 [pool-2-thread-3] org.springframework.integration.handler.LoggingHandler INFO 事件:[解析的 JSON 记录#] - 消息:[GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=, ratePlanId=1030513,stayDate=2015 年 8 月 26 日星期三 17:00:00 PDT,ratePlanLevel=0,hotelId=1615126,rprLogSeqNum=6,logActionTypeId=2,sellStateId=0,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged =,supplierUpdateDate=2015 年 8 月 22 日星期六 07:57:35 PDT,supplierUpdateTuid=46712703,createDate=2015 年 8 月 22 日星期六 07:57:35 PDT,changeRequestId=31461014045,changeRequestSourceId=], headers={sequenceNumber=3, file_name=2015 -08-22-23-58-0.302402118982895.gz,sequenceSize=0,correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510,file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-服务/目标/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,id=ddf1ee98-55c4-81de-af77-a886a340fe07,时间戳=1441990379897}] 2015-09-11 09:52:59,919 [pool-2-thread-2] org.springframework.integration.handler.LoggingHandler INFO 事件:[解析的 JSON 记录#] - 消息:[GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=, ratePlanId=163007,stayDate=Fri Dec 11 16:00:00 PST 2015,ratePlanLevel=0,hotelId=897973,rprLogSeqNum=3,logActionTypeId=2,sellStateId=0,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged =,supplierUpdateDate=2015 年 8 月 22 日星期六 07:57:16 PDT,supplierUpdateTuid=46712703,createDate=2015 年 8 月 22 日星期六 07:57:16 PDT,changeRequestId=31461009374,changeRequestSourceId=], headers={sequenceNumber=2, file_name=2015 -08-22-23-58-0.302402118982895.gz,sequenceSize=0,correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510,file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-服务/目标/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz,id=d7d7a418-6593-bc57-fc7d-e181778be0c8,时间戳=1441990379899}]] ...目录内容
.../target/s3-dump/RatePlanLevelRestriction
+- 2015
+-- 08
+--- 23
+---- 00
+----- 2015-08-22-23-58-0.302402118982895.gz
+----- 2015-08-22-23-58-0.302992661055088.gz
+----- 2015-08-22-23-58-0.303107496339691.gz
如果您对此感到好奇,以下是要点:
-
LoggingUtils: https://gist.github.com/fastnsilver/82f242dd5b42bfd118e8 -
amazonS3InboundSynchronizationMessageSource()配置:https://gist.github.com/fastnsilver/fb750c02b58a04686509
【问题讨论】:
标签: java spring spring-integration dsl