【问题标题】:Appropriate use of Spring Integration Java DSL plus AmazonS3InboundSynchronizationMessageSource适当使用 Spring Integration Java DSL 加 AmazonS3InboundSynchronizationMessageSource
【发布时间】:2015-11-30 01:35:18
【问题描述】:

我正在使用AmazonS3InboundSynchronizationMessageSource 读取分布在由type >> year >> month >> day >> hour >> {filename}-{uniqueid}.gz 组织的S3 存储桶子目录中的数百万个文件。理想情况下,我想轮询和写入并让同步器记住我在后续轮询中读取的最后一个位置以检索后续批次。然而,这不是上述MessageSource 的设计方式。

无论如何,我可以通过选择一个范围并读取内容来解决这个问题。

除此之外,如果我采取一种简单的方法并在第一次轮询时从一个目录中读取文件;之后我想关闭(System.exit)(实际上是在下面的 cmets 中进行了一些处理之后)。

所以,类似于这里的问题:

Spring Integration Inbound-channel-adapter: make one poll and exit

我只想投票一次并在第一次投票后退出。 (也许有不同的方法来解决它?我愿意接受建议)。

应用引导

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationApp extends SpringBootServletInitializer {

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
    return application.sources(DataMigrationApp.class);
}

public static void main(String[] args) {
    SpringApplication.run(DataMigrationApp.class, args);
}

}

更新 (2015-09-06)

代码示例

@Configuration
public class DataMigrationModule {

private final Logger log = LoggerFactory.getLogger(getClass());

@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;

@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;

@Value("${cloud.aws.s3.bucket}")
private String bucket;

@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;

@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;

@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;

@Value("${cloud.aws.s3.local-directory:target/s3-dump}")
private String localDirectory;

@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;

@Value("${app.persistent-type:}")
private String persistentType;

@Value("${app.repository-type:}")
private String repositoryType;

@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;

@Autowired
private ListableBeanFactory beanFactory;

private final AtomicBoolean invoked = new AtomicBoolean();

public Date nextExecutionTime(TriggerContext triggerContext) {
    return this.invoked.getAndSet(true) ? null : new Date();
}

private FileToInputStreamTransformer unzipTransformer() {
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);
    return transformer;
}

private Class<?> repositoryType() {
    try {
        return Class.forName(repositoryType);
    } catch (ClassNotFoundException cnfe) {
        log.error("DataMigrationModule.failure -- (Unknown repository implementation!)", cnfe);
        System.exit(0);
    }
    return null;
}

private Class<?> persistentType() {
    try {
        return Class.forName(persistentType);
    } catch (ClassNotFoundException cnfe) {
        log.error("DataMigrationModule.failure -- (Unsupported type!)", cnfe);
        System.exit(0);
    }
    return null;
}

@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
    AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
    AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
    messageSource.setCredentials(credentials);
    messageSource.setBucket(bucket);
    messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
    messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
    messageSource.setRemoteDirectory(remoteDirectory);
    if (!fileNameWildcard.isEmpty()) {
        messageSource.setFileNameWildcard(fileNameWildcard);
    }
    String directory = System.getProperty("java.io.tmpdir");
    if (!localDirectory.startsWith("/")) {
        localDirectory = "/" + localDirectory;
    }
    if (!localDirectory.endsWith("/")) {
        localDirectory = localDirectory + "/";
    }
    directory = directory + localDirectory;
    FileUtils.mkdir(directory);
    messageSource.setDirectory(new LiteralExpression(directory));
    return messageSource;
}

@Bean
DirectChannel inputChannel() {
    return new DirectChannel();
}

@Bean 
JdbcRepositoryHandler jdbcRepositoryHandler() {
    return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}

@Bean
public IntegrationFlow flow() {
    // formatter:off
    return IntegrationFlows
            .from(
                    this.amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            // TODO add advised PollableChannel to deal with possible decompression issues

            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()))
            // TODO add advised PollableChannel to deal with possible transform issues

            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // formatter:on
}

public class JdbcRepositoryHandler extends AbstractReplyProducingMessageHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @SuppressWarnings("rawtypes")
    private Insertable repository;

    public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
        repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
    }

    @Override
    protected Object handleRequestMessage(Message<?> message) {
        List<?> result = null;
        try {
            result = repository.insert((List<?>) message.getPayload());
        } catch (TransactionSystemException | DataAccessException e) {
            // TODO Quite a bit more work to add retry capability for records that didn't cause failure
            log.error("DataMigrationModule.failure -- (Could not persist batch!)", ExceptionUtils.getStackTrace(e));
        }
        return result;
    }

}

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    @Override
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload));
    }
}

}

【问题讨论】:

    标签: java spring amazon-s3 spring-integration dsl


    【解决方案1】:

    其实不知道你的问题是什么。

    顺便说一句,你走对了。

    对于OnlyOnceTrigger,您可以使用我的测试用例中的类似内容:

        private final AtomicBoolean invoked = new AtomicBoolean();
    
        public Date nextExecutionTime(TriggerContext triggerContext) {
            return this.invoked.getAndSet(true) ? null : new Date();
        }
    
    ...
    
        e -> e.poller(p -> p.trigger(this::nextExecutionTime))
    

    为了解压你的文件,你应该这样做:

    .<File, InputStream>transform(p -> new GZIPInputStream(new FileInputStream(p)))
    

    您必须这样做只是因为有一个开箱即用的FileSplitter 组件可以逐行读取文件并为每个文件发出消息。而且那个支持InputStream作为有效载荷,让您避免将整个文件加载到内存中。

    所以,IntegrationFlow 中的下一个 EIP 方法如下:

    .split(new FileSplitter())
    

    不确定之后是否需要将每个域对象聚合到某个列表以进行进一步的批量插入,因为您可以通过 ExecutorChannel 逐一分发它们

    如您所见,delete unpacked file 步骤中没有任何理由。

    以及在最后一个delete all *.gz files 步骤中。仅仅因为您可能依赖AcceptOnceFileListFilter 来避免在下一次轮询任务中重新读取相同的文件。

    如果我遗漏了什么,请告诉我。

    【讨论】:

    • 谢谢阿特姆!是的,感谢您理解我正在尝试做的事情。试图了解我正在阅读的关于SplittersAggregators 的内容。该文件的内容是单独的 JSON 有效负载。因此,我假设我可以再次转换为类的实例,获取该实例并添加到列表中。我希望调用一个 repo 方法来插入一批List&lt;T&gt; 的记录。我相信我会在MessageHandler 中做到这一点。因此,如果文件中的行超过批量大小,我需要停止填充列表并调用插入,然后清除列表并继续。不确定如何使用可用的 DSL 方法来做到这一点。
    • 你可以直接使用.aggregate() EIP 方法和MessageCountReleaseStrategy.expireGroupsUponCompletion(true) 选项来做到这一点。
    • 我会尝试一下,当我的流程正常工作时,我会更新这篇文章并给予你信任。最后,上述策略是否考虑到最终要写入的批次可能不会超过阈值并因此(可能永远不会)被写入的情况?
    • 最后一个问题是groupTimeoutExpression 指定聚合器应该多久尝试一次释放非完整组。
    • 我需要清理本地目录 b/c 我不想用同步文件填充它。越来越接近工作解决方案。完成后将发布。
    猜你喜欢
    • 2018-08-18
    • 2015-01-29
    • 1970-01-01
    • 1970-01-01
    • 2022-07-11
    • 2016-06-10
    • 2015-10-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多