【发布时间】:2020-04-08 07:16:14
【问题描述】:
我正在定义一个 IntegrationFlow 以通过这种方式使用 DSL 语法从 SFTP 流式传输到 S3:
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
.remoteDirectory("remoteDirectory"),
e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.transform(new StreamTransformer())
.handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
.get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
return s3MessageHandler;
}
它按预期工作:文件已很好地上传到我的 S3 存储桶。但是,我想避免SPEL 语法,并将消息中的标头注入s3uploadMessageHandler 方法,这样我可以使用简单的ValueExpression 在s3UploadMessageHandler 方法中设置keyExpression。
为此,我改变了
handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
到
handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3
但是现在这个处理程序似乎不再被触发了。日志中没有错误,我从日志中知道 SFTP 轮询仍在工作。
我试图找到这背后的原因,我看到在IntegrationFlowdefinition.java中输入handle方法时,messageHandler的类类型是不同的:它是一个S3MessageHandler,在没有lambda的情况下调用,一个MyCallingClass$lambda使用 lambda 表达式调用时。
为了让我的场景正常工作,我错过了什么?
【问题讨论】:
标签: java lambda spring-integration spring-integration-dsl spring-integration-aws