【发布时间】:2017-01-10 01:58:47
【问题描述】:
我有一个由 S3 事件每 15 分钟触发一次的 Java Lambda 函数。我注意到,在大约每 3 小时的时间段内,每个 Lambda 调用都包括上传的最新文件以及在该 3 小时时间跨度内上传的所有文件。
因此,如果在遍历整个 List 时,它会重复在之前的 Lambda 调用中已经处理过的文件。
如何让它只处理最近上传的文件?在 node.js 中,有一个 context.suceed(),我假设它将该事件标记为成功处理。 Java 似乎没有。
以下是 Cloudwatch 日志。
08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST
08:35:26 TIME - AUTHENTICATE: 8101ms
08:35:26 TIME - MESSAGE PARSE: 1ms
08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
08:35:35 Processed 147 events
08:35:35 TIME - FILE PARSE: 9698
08:35:35 Found 1 event files
08:35:35 Total function took: 17800ms
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST
08:45:03 TIME - AUTHENTICATE: 119ms
08:45:03 TIME - MESSAGE PARSE: 0ms
08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
08:45:05 Processed 147 events
08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv
08:45:06 Processed 211 events
08:45:06 TIME - FILE PARSE: 2499
08:45:06 Found 2 event files
08:45:06 Total function took: 2618ms
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB
09:05:02 START RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST
09:05:02 TIME - AUTHENTICATE: 98ms
09:05:02 TIME - MESSAGE PARSE: 0ms
09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
09:05:03 Processed 147 events
09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv
09:05:04 Processed 211 events
09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv
09:05:04 Processed 204 events
09:05:04 TIME - FILE PARSE: 2242
09:05:04 Found 3 event files
09:05:04 Total function took: 2340ms
09:05:04 END RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc
编辑 1 我相信迈克尔已经回答了这个问题,但是下面是其他人的一些代码。我确实在使用全局列表来保存记录。
公共类 LambdaHandler {
private final List<GDELTEventFile> eventFiles = new ArrayList<>();
private AmazonS3Client s3Client;
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim();
public void gdeltHandler(S3Event event, Context context) {
StopWatch sw = new StopWatch();
long time = 0L;
sw.start();
s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
sw.split();
System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms");
time += sw.getSplitTime();
sw.reset();
sw.start();
processEvent(event);
sw.split();
System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms");
time += sw.getSplitTime();
sw.reset();
sw.start();
processFiles();
sw.split();
System.out.println("TIME - FILE PARSE: " + sw.getSplitTime());
time += sw.getSplitTime();
System.out.println("Found " + eventFiles.size() + " event files");
System.out.println("Total function took: " + time + "ms");
}
private void processEvent(S3Event event) {
List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords();
for (S3EventNotification.S3EventNotificationRecord record : records) {
long filesize = record.getS3().getObject().getSizeAsLong();
eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize));
}
}
private void processFiles() {
for (GDELTEventFile event : eventFiles) {
try {
System.out.println(event.getBucket() + " :: " + event.getFilename());
GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename());
S3Object file = s3Client.getObject(request);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) {
CSVParser parser = new CSVParser(reader, CSV_FORMAT);
int count = 0;
for (CSVRecord record : parser) {
count++;
}
}
System.out.println("Processed " + count + " events");
}
} catch (IOException ioe) {
System.out.println("IOException :: " + ioe);
}
}
}
【问题讨论】:
-
data ::行中的信息从何而来?我从未见过 S3 每个事件发送超过一条记录...事实上,我的代码旨在停止并在发生异常时抛出异常,因为这是出乎意料的。我怀疑您可能没有考虑到可能的容器重用。如果 Lambda 事件重用了上一次调用中的相同容器,那么您有可能拥有一个保留旧事件的全局数据结构? -
嗨,Michael,如果您要问实际的行在哪里打印,我正在从函数中打印这些行。它只是一个 System.out.println。该文件本身由 gdelt 服务每 15 分钟生成一次,我将其下载到 ec2 实例上,然后通过 cli 上传。我如何考虑 lambda 中的容器重用?以前从未听说过...
-
当我说“考虑”容器重用时,我的意思是be aware that it's a thing 并相应地编码。每次触发函数时,它都会触发运行您的 Lambda 函数的空闲进程......但它可能是新的,也可能是上次运行该函数的 相同 进程,或者在此之前的某个时间,被重用(只要你没有上传新代码)。如果您将记录从 S3 推送到全局数组,而不是处理程序的范围(例如),那么当您迭代该数组时,它将有时包含旧事件。
-
哦,我问的是数据的来源,而不是输出的来源。你的一点点代码——只是从开始到输出信息的部分——可能会说明问题。
-
看起来你可能是对的,我使用的是全局列表。我知道 Lambda 会打开容器,但我从来没有想过它会打开实际的进程。我将重构以使用局部范围的变量。
标签: amazon-web-services amazon-s3 aws-lambda