【发布时间】:2021-05-10 07:27:49
【问题描述】:
我们正在尝试在 DDB 中的一张表中进行简单的数据迁移。
基本上,我们正在添加一个新字段,我们需要回填其中一个表中的所有文档。
这个表有大约 700K 文档。
我们遵循的过程非常简单:
- 在这种情况下,手动触发将扫描表格并针对每个文档更新文档并继续执行相同操作直到接近 15 分钟顶部的 lambda
- 将 LastEvaluatedKey 放入 SQS 以触发使用该键继续扫描的新 lambda 执行。
- 进程继续根据需要按顺序生成 lambda,直到没有更多文档
我们发现的问题如下...
迁移完成后,我们注意到更新的文档数量远低于该表中现有的文档总数。这是一个随机值,并不总是相同,但范围从数万到数十万(我们看到的最坏情况是 300K 差异)。
这显然是个问题,因为如果我们再次扫描文档,很明显有些文档没有迁移。起初我们认为这是因为一些客户端更新/插入新文档,但该表上的吞吐量并没有那么大,可以证明如此大的差异是合理的,所以这并不是说在我们运行迁移时添加了新文档。
我们尝试了第二种方法,首先扫描,因为如果我们只扫描,我们注意到扫描文档的数量 == 表中文档的计数,因此我们尝试将文档的 ID 转储到另一个表中,然后扫描该表并再次更新这些项目。有趣的是,这个只有 ID 的新表也会出现同样的问题,比我们要更新的表中的计数要少得多,因此,我们回到了第一格。
我们考虑过使用并行扫描,但我看不出这有什么好处,而且我不想在运行迁移时损害表的读取能力。
任何在 DDB 中具有数据迁移经验的人都可以在这里有所了解吗?我们无法弄清楚我们做错了什么。
UPDATE:共享触发并实际扫描更新的函数
@Override
public Map<String, AttributeValue> migrateDocuments(String lastEvaluatedKey, String typeKey){
LOG.info("Migrate Documents started {} ", lastEvaluatedKey);
int noOfDocumentsMigrated = 0;
Map<String, AttributeValue> docLastEvaluatedKey = null;
DynamoDBMapperConfig documentConfig = new DynamoDBMapperConfig.TableNameOverride("KnowledgeDocumentMigration").config();
if(lastEvaluatedKey != null) {
docLastEvaluatedKey = new HashMap<String,AttributeValue>();
docLastEvaluatedKey.put("base_id", new AttributeValue().withS(lastEvaluatedKey));
docLastEvaluatedKey.put("type_key",new AttributeValue().withS(typeKey));
}
Instant endTime = Instant.now().plusSeconds(840);
LOG.info("Migrate Documents endTime:{}", endTime);
try {
do {
ScanResultPage<Document> docScanList = documentDao.scanDocuments(docLastEvaluatedKey, documentConfig);
docLastEvaluatedKey = docScanList.getLastEvaluatedKey();
LOG.info("Migrate Docs- docScanList Size: {}", docScanList.getScannedCount());
docLastEvaluatedKey = docScanList.getLastEvaluatedKey();
LOG.info("lastEvaluatedKey:{}", docLastEvaluatedKey);
final int chunkSize = 25;
final AtomicInteger counter = new AtomicInteger();
final Collection<List<Document>> docChunkList = docScanList.getResults().stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / chunkSize)).values();
List<List<Document>> docListSplit = docChunkList.stream().collect(Collectors.toList());
docListSplit.forEach(docList -> {
TransactionWriteRequest documentTx = new TransactionWriteRequest();
for (Document document : docList) {
LOG.info("Migrate Documents- docList Size: {}", docList.size());
LOG.info("Migrate Documents- Doc Id: {}", document.getId());
if (!StringUtils.isNullOrEmpty(document.getType()) && document.getType().equalsIgnoreCase("Faq")) {
if (docIdsList.contains(document.getId())) {
LOG.info("this doc already migrated:{}", document);
} else {
docIdsList.add(document.getId());
}
if ((!StringUtils.isNullOrEmpty(document.getFaq().getQuestion()))) {
LOG.info("doc FAQ {}", document.getFaq().getQuestion());
document.setTitle(document.getFaq().getQuestion());
document.setTitleSearch(document.getFaq().getQuestion().toLowerCase());
documentTx.addUpdate(document);
}
} else if (StringUtils.isNullOrEmpty(document.getType())) {
if (!StringUtils.isNullOrEmpty(document.getTitle()) ) {
if (!StringUtils.isNullOrEmpty(document.getQuestion())) {
document.setTitle(document.getQuestion());
document.setQuestion(null);
}
LOG.info("title {}", document.getTitle());
document.setTitleSearch(document.getTitle().toLowerCase());
documentTx.addUpdate(document);
}
}
}
if (documentTx.getTransactionWriteOperations() != null
&& !documentTx.getTransactionWriteOperations().isEmpty() && docList.size() > 0) {
LOG.info("DocumentTx size {}", documentTx.getTransactionWriteOperations().size());
documentDao.executeTransaction(documentTx, null);
}
});
noOfDocumentsMigrated = noOfDocumentsMigrated + docScanList.getScannedCount();
}while(docLastEvaluatedKey != null && (endTime.compareTo(Instant.now()) > 0));
LOG.info("Migrate Documents execution finished at:{}", Instant.now());
if(docLastEvaluatedKey != null && docLastEvaluatedKey.get("base_id") != null)
sqsAdapter.get().sendMessage(docLastEvaluatedKey.get("base_id").toString(), docLastEvaluatedKey.get("type_key").toString(),
MIGRATE, MIGRATE_DOCUMENT_QUEUE_NAME);
LOG.info("No Of Documents Migrated:{}", noOfDocumentsMigrated);
}catch(Exception e) {
LOG.error("Exception", e);
}
return docLastEvaluatedKey;
}
【问题讨论】:
-
我不知道会发生什么......但你为什么要首先尝试这样做。 (错误)使用这样的 lambda 而不是启动 ECS 任务来完成它(在 AWS 内部)或只是在本地 PC 上编写代码有什么可感知的好处?
-
@Charles 我不确定你对滥用这样的 lambda 是什么意思,我知道事实上其他团队在我的公司中使用过这种方法,尽管使用了步进函数和并行扫描,因为表大得多。实际上,我们从他们那里复制了 lambda 上的大部分处理。只是想知道为什么当我们同时扫描 + 更新 VS 仅扫描时会有如此大的差异。老实说,我不明白为什么为此选择 lambda 是个坏主意。
-
我说的是误用,因为 lambda 是“针对简单快速的功能进行了优化”并且按毫秒计费。您有意计划在几个小时内运行一个 lambda 函数。如果需要,为什么不直接启动一个使用并行扫描的 Fargate 任务。
-
@Charles 还没有考虑过,但回到我的问题,这有什么不同吗?
-
idk..当您写入 SQS 时?如果 lambda 在你的代码中间死掉会发生什么?
标签: amazon-web-services amazon-dynamodb database-migration