【问题标题】:Sync S3 Bucket and listen for changes同步 S3 存储桶并监听变化
【发布时间】:2019-10-23 12:35:31
【问题描述】:

我有一个 AWS S3 存储桶,我每周都会在其中放置一个新的 ZIP 文件。

我想在我现有的 Web 服务中添加一个功能,使用 Spring Boot 编写:在本地同步存储桶并观察变化。

目前,同步效果很好:每当将新文件添加到存储桶中时,它都会在本地下载。但是,我不知道要监听文件更新,这是一种在本地下载新文件时触发的方法。可以吗?

这是我的一段代码:

#  --------
# | AWS S3 |
#  --------
s3.credentials-access-key=***
s3.credentials-secret-key=****
s3.bucket = my-bucket
s3.remote-dir = zips
s3.local-dir = D:/s3-bucket/
@Log4j2
@Configuration
public class S3Config {

    public static final String OUT_CHANNEL_NAME = "s3filesChannel";

    @Value("${s3.credentials-access-key}") private String accessKey;
    @Value("${s3.credentials-secret-key}") private String secretKey;
    @Value("${s3.remote-dir}") private String remoteDir;
    @Value("${s3.bucket}") private String s3bucket;
    @Value("${s3.local-dir}") private String localDir;

    /*
     * AWS S3
     */
    @Bean
    public AmazonS3 getAmazonS3(

    ){
        BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
        AmazonS3 s3client = AmazonS3ClientBuilder
                .standard()
                .withRegion(Regions.EU_WEST_1)
                .withCredentials(new AWSStaticCredentialsProvider(creds))
                .build();
        return s3client;        
    }

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
        sync.setPreserveTimestamp(true);
        sync.setDeleteRemoteFiles(false);
        String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
        sync.setRemoteDirectory(fullRemotePath);
        sync.setFilter(new S3RegexPatternFileListFilter(".*\\.zip$"));
        return sync;
    }

    @Bean
    @InboundChannelAdapter(value = OUT_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
        S3InboundFileSynchronizer pS3InboundFileSynchronizer
    ) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDir));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fileReadingFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
            GtfsBizkaibus pGtfsBizkaibus,
            @Qualifier("fileProcessor") MessageHandler pMessageHandler) {
        return IntegrationFlows
                .from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(pMessageHandler)
                .get();
    }

    @Bean("fileProcessor")
    public MessageHandler fileProcessor() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
        handler.setExpectReply(false); // end of pipeline, reply not needed
        handler.setFileExistsMode(FileExistsMode.APPEND);
        handler.setNewFileCallback((file, msg) -> {
            log.debug("New file created... " + file.getAbsolutePath());
        });
        return handler;
    }

【问题讨论】:

    标签: spring-boot amazon-s3 spring-integration spring-integration-aws


    【解决方案1】:

    实际上,S3InboundFileSynchronizingMessageSource 为您完成了所有必要的工作:当新文件添加到远程存储桶中时,它会下载到本地目录并在消息中生成为 payload 以发送到配置的通道.

    当远程文件被修改时,它也被下载到本地目录。

    5.0 版本开始,AbstractInboundFileSynchronizingMessageSource 提供此选项:

    /**
     * Switch the local {@link FileReadingMessageSource} to use its internal
     * {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
     * @param useWatchService the {@code boolean} flag to switch to
     * {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
     * @since 5.0
     */
    public void setUseWatchService(boolean useWatchService) {
        this.fileSource.setUseWatchService(useWatchService);
        if (useWatchService) {
            this.fileSource.setWatchEvents(
                    FileReadingMessageSource.WatchEventType.CREATE,
                    FileReadingMessageSource.WatchEventType.MODIFY,
                    FileReadingMessageSource.WatchEventType.DELETE);
        }
    }
    

    如果这对你有意义的话。

    但是是的...通过 S3 到 SQS 通知,它也将是一个很好的解决方案。 Spring Integration AWS 项目中有一个SqsMessageDrivenChannelAdapter

    【讨论】:

    • 所以我可以添加messageSource.setUseWatchService(true);,但我还应该怎么做呢?定义一个新的 Bean?
    • 我不确定你的意思。您将OUT_CHANNEL_NAME 作为S3InboundFileSynchronizingMessageSource 的输出,因此您需要@ServiceActivator 来使用来自该通道的消息。这些消息将包含来自 S3 存储桶的本地副本的 File 有效负载。您不需要任何FileWritingMessageHandler,因为从 S3 同步后,远程文件已经位于本地目录中。
    • 我找到了解决方案。我正在发布它。谢谢@Artem Bilan
    【解决方案2】:

    您可以使用 S3 事件通知和 SQS 队列。基本上,当一个对象添加到您的存储桶时,S3 可以将事件发布到已注册的 SQS 队列。然后,您可以让您的本地应用程序长轮询队列中的新事件并处理添加的任何事件。

    更多信息请参见here

    【讨论】:

      【解决方案3】:

      最后,正如@Artem Bilian 所建议的,我使用了@ServiceActivator 注释。这是完整的示例:

      import java.io.File;
      import java.util.concurrent.TimeUnit;
      
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.integration.annotation.InboundChannelAdapter;
      import org.springframework.integration.annotation.Poller;
      import org.springframework.integration.annotation.ServiceActivator;
      import org.springframework.integration.aws.inbound.S3InboundFileSynchronizer;
      import org.springframework.integration.aws.inbound.S3InboundFileSynchronizingMessageSource;
      import org.springframework.integration.aws.support.S3SessionFactory;
      import org.springframework.integration.aws.support.filters.S3RegexPatternFileListFilter;
      import org.springframework.integration.channel.QueueChannel;
      import org.springframework.integration.dsl.IntegrationFlow;
      import org.springframework.integration.dsl.IntegrationFlows;
      import org.springframework.integration.file.FileWritingMessageHandler;
      import org.springframework.integration.file.support.FileExistsMode;
      import org.springframework.messaging.Message;
      import org.springframework.messaging.MessageHandler;
      import org.springframework.messaging.PollableChannel;
      
      import com.amazonaws.auth.AWSStaticCredentialsProvider;
      import com.amazonaws.auth.BasicAWSCredentials;
      import com.amazonaws.regions.Regions;
      import com.amazonaws.services.s3.AmazonS3;
      import com.amazonaws.services.s3.AmazonS3ClientBuilder;
      
      import lombok.extern.log4j.Log4j2;
      
      @Log4j2
      @Configuration
      public class S3Config {
      
          public static final String IN_CHANNEL_NAME = "s3filesChannel";
      
          @Value("${s3.credentials-access-key}") private String accessKey;
          @Value("${s3.credentials-secret-key}") private String secretKey;
          @Value("${s3.remote-dir}") private String remoteDir;
          @Value("${s3.bucket}") private String s3bucket;
          @Value("${s3.local-dir}") private String localDir;
      
          /*
           * AWS S3
           */
          @Bean
          public AmazonS3 getAmazonS3(
      
          ){
              BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
              AmazonS3 s3client = AmazonS3ClientBuilder
                      .standard()
                      .withRegion(Regions.EU_WEST_1)
                      .withCredentials(new AWSStaticCredentialsProvider(creds))
                      .build();
              return s3client;        
          }
      
          @Bean
          public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
              return new S3SessionFactory(pAmazonS3);
          }
      
          @Bean
          public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
              S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
              sync.setPreserveTimestamp(true);
              sync.setDeleteRemoteFiles(false);
              String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
              sync.setRemoteDirectory(fullRemotePath);
              sync.setFilter(new S3RegexPatternFileListFilter(".*\\.zip$"));
              return sync;
          }
      
          @Bean
          @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
          public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
              S3InboundFileSynchronizer pS3InboundFileSynchronizer
          ) {
              S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
              messageSource.setAutoCreateLocalDirectory(true);
              messageSource.setLocalDirectory(new File(localDir));
              messageSource.setUseWatchService(true);
              return messageSource;
          }
      
          @Bean("s3filesChannel")
          public PollableChannel s3FilesChannel() {
              return new QueueChannel();
          }
      
          @Bean
          public IntegrationFlow fileReadingFlow(
                  S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
                  @Qualifier("fileProcessor") MessageHandler pMessageHandler) {
              return IntegrationFlows
                      .from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
                      .handle(pMessageHandler)
                      .get();
          }
      
          @Bean("fileProcessor")
          public MessageHandler fileProcessor() {
              FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
              handler.setExpectReply(false); // end of pipeline, reply not needed
              handler.setFileExistsMode(FileExistsMode.REPLACE);
              return handler;
          }
      
          @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
          public void asada(Message<?> message) {
              // TODO
              log.debug("<< New message!");
          }
      
      }
      

      请注意,我已将 OUT_CHANNEL_NAME 替换为 IN_CHANNEL_NAME

      PS:我对 Spring Integration 几乎是新手,所以我还在学习它的概念。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-10-10
        • 2017-10-26
        • 2018-01-26
        • 1970-01-01
        • 2014-08-20
        • 2023-03-30
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多