【问题标题】:Commit Offsets to Kafka on Spark Executors在 Spark Executors 上向 Kafka 提交偏移量
【发布时间】:2020-01-27 14:58:38
【问题描述】:

我从 Kafka 获取事件,在 Spark 上丰富/过滤/转换它们,然后将它们存储在 ES 中。我将偏移量返还给 Kafka

我有两个问题/问题:

(1) 我目前的 Spark 作业非常慢

我有一个主题的 50 个分区和 20 个执行程序。每个执行器有 2 个核心和 4g 内存。我的驱动程序有 8g 内存。我消耗 1000 个事件/分区/秒,我的批处理间隔为 10 秒。这意味着,我在 10 秒内消耗了 500000 个事件

我的ES集群如下:

20 个分片/索引

3 个主实例 c5.xlarge.elasticsearch

12 个实例 m4.xlarge.elasticsearch

磁盘/节点 = 1024 GB 所以总共 12 TB

而且我遇到了巨大的调度和处理延迟

(2) 我如何提交执行者的偏移量?

目前,我在执行程序上丰富/转换/过滤我的事件,然后使用 BulkRequest 将所有内容发送到 ES。这是一个同步过程。如果我得到积极的反馈,我会将偏移量列表发送给驱动程序。如果没有,我会发回一个空列表。在驱动程序上,我向 Kafka 提交偏移量。我相信,应该有一种方法,我可以在执行者上提交偏移量,但我不知道如何将 kafka Stream 传递给执行者:

((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, this::onComplete);

这是向需要 Kafka Stream 的 Kafka 提交偏移量的代码

这是我的整体代码:

 kafkaStream.foreachRDD( // kafka topic
                rdd -> { // runs on driver
                    rdd.cache();
                    String batchIdentifier =
                            Long.toHexString(Double.doubleToLongBits(Math.random()));

                    LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ...");

                    Instant batchStart = Instant.now();

                    List<OffsetRange> offsetsToCommit =
                            rdd.mapPartitionsWithIndex( // kafka partition
                                    (index, eventsIterator) -> { // runs on worker

                                        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

                                        LOGGER.info(
                                                "@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index
                                        );

                                        if (!eventsIterator.hasNext()) {
                                            return Collections.emptyIterator();
                                        }

                                        // get single ES documents
                                        List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator);

                                        // build request wrappers
                                        List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]);

                                        LOGGER.info(
                                                "@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size()
                                        );

                                        BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList);

                                        if (!bulkItemResponses.hasFailures()) {
                                            return Arrays.asList(offsetRanges).iterator();
                                        }

                                        elasticSearchRepository.close();
                                        return Collections.emptyIterator();
                                    },
                                    true
                            ).collect();

                    LOGGER.info(
                            "@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
                    );

                    OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()];

                    for (int i = 0; i < offsets.length ; i++) {
                        offsets[i] = offsetsToCommit.get(i);
                    }

                    try {
                        offsetManagementMapper.commit(offsets);
                    } catch (Exception e) {
                        // ignore
                    }

                    LOGGER.info(
                            "@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " +
                                    "in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
                    );
                    rdd.unpersist();
                });

【问题讨论】:

    标签: java apache-spark elasticsearch apache-kafka


    【解决方案1】:

    您可以将偏移逻辑移动到 rdd 循环上方...我使用下面的模板来获得更好的偏移处理和性能

    JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    
    
    
            kafkaStream.foreachRDD( kafkaStreamRDD -> {
                //fetch kafka offsets for manually commiting it later
                OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
    
                //filter unwanted data
                kafkaStreamRDD.filter(
                        new Function<ConsumerRecord<String, String>, Boolean>() {
                    @Override
                    public Boolean call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
                        if(kafkaRecord!=null) {
                            if(!StringUtils.isAnyBlank(kafkaRecord.key() , kafkaRecord.value())) {
                                return Boolean.TRUE;
                            }
                        }
                        return Boolean.FALSE;
                    }
                }).foreachPartition( kafkaRecords -> {
    
                    // init connections here
    
                    while(kafkaRecords.hasNext()) {
                        ConsumerRecord<String, String> kafkaConsumerRecord = kafkaRecords.next();
                        // work here
                    }
    
                });
                //commit offsets
                ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
            });
    

    【讨论】:

    • 感谢您的回答。但我有个问题。我收集的原因是因为我需要知道哪个执行器能够正确地将事件存储在 ES 中。我可能错了,但是如果我按照您的方法进行操作,那么我会在不检查执行程序是否正确存储数据的情况下提交偏移量
    • 您可以创建一些自定义异常,因为如果没有将数据持久化到 ES,您的任务将会失败。如果失败,您的 spark 将重试配置的次数。只有这样你的偏移量才会被提交。关于在执行器级别存储偏移量,您可能必须使用一些持久层。如果您有更好的方法,请告诉我。
    • 我可以在驱动程序上存储偏移量,我只是不知道如何告诉驱动程序所有内容都存储在 ES 中。如果我在驱动程序上执行此操作,那么我必须将执行程序的所有结果收集到驱动程序
    猜你喜欢
    • 2017-06-24
    • 2018-06-08
    • 2015-04-09
    • 2021-11-14
    • 2017-03-17
    • 2018-07-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多