【发布时间】:2020-01-27 14:58:38
【问题描述】:
我从 Kafka 获取事件,在 Spark 上丰富/过滤/转换它们,然后将它们存储在 ES 中。我将偏移量返还给 Kafka
我有两个问题/问题:
(1) 我目前的 Spark 作业非常慢
我有一个主题的 50 个分区和 20 个执行程序。每个执行器有 2 个核心和 4g 内存。我的驱动程序有 8g 内存。我消耗 1000 个事件/分区/秒,我的批处理间隔为 10 秒。这意味着,我在 10 秒内消耗了 500000 个事件
我的ES集群如下:
20 个分片/索引
3 个主实例 c5.xlarge.elasticsearch
12 个实例 m4.xlarge.elasticsearch
磁盘/节点 = 1024 GB 所以总共 12 TB
而且我遇到了巨大的调度和处理延迟
(2) 我如何提交执行者的偏移量?
目前,我在执行程序上丰富/转换/过滤我的事件,然后使用 BulkRequest 将所有内容发送到 ES。这是一个同步过程。如果我得到积极的反馈,我会将偏移量列表发送给驱动程序。如果没有,我会发回一个空列表。在驱动程序上,我向 Kafka 提交偏移量。我相信,应该有一种方法,我可以在执行者上提交偏移量,但我不知道如何将 kafka Stream 传递给执行者:
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, this::onComplete);
这是向需要 Kafka Stream 的 Kafka 提交偏移量的代码
这是我的整体代码:
kafkaStream.foreachRDD( // kafka topic
rdd -> { // runs on driver
rdd.cache();
String batchIdentifier =
Long.toHexString(Double.doubleToLongBits(Math.random()));
LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ...");
Instant batchStart = Instant.now();
List<OffsetRange> offsetsToCommit =
rdd.mapPartitionsWithIndex( // kafka partition
(index, eventsIterator) -> { // runs on worker
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
LOGGER.info(
"@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index
);
if (!eventsIterator.hasNext()) {
return Collections.emptyIterator();
}
// get single ES documents
List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator);
// build request wrappers
List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]);
LOGGER.info(
"@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size()
);
BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList);
if (!bulkItemResponses.hasFailures()) {
return Arrays.asList(offsetRanges).iterator();
}
elasticSearchRepository.close();
return Collections.emptyIterator();
},
true
).collect();
LOGGER.info(
"@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
);
OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()];
for (int i = 0; i < offsets.length ; i++) {
offsets[i] = offsetsToCommit.get(i);
}
try {
offsetManagementMapper.commit(offsets);
} catch (Exception e) {
// ignore
}
LOGGER.info(
"@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " +
"in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
);
rdd.unpersist();
});
【问题讨论】:
标签: java apache-spark elasticsearch apache-kafka