【问题标题】:Creating batches from the poll records in kafka从kafka中的投票记录创建批次
【发布时间】:2019-01-24 05:53:16
【问题描述】:

我们需要对 elasticsearch 执行批量写入。我们想知道是否有更好的方法来批处理数据并避免在批处理时丢失数据

 public void consume() {
        logger.debug("raw consume......");

        String topic = "json.incoming";
        String consGroup = "rConsumerGroup";

        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "20000");
        props.put("max.poll.records", "10000");

        consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER, props);
        logger.debug("Kafka Consumer Initialized......");
        buffer = new ArrayList<MessageVO>();

        while (true) {
            try {
                ConsumerRecords<String, JsonNode> records = consumer.poll(100);
                Date startTime = Calendar.getInstance()
                    .getTime();
                if (records.count() == 0 && !buffer.isEmpty()) {
                    lastSeenZeroPollCounter++;
                }
                if (records.count() > 0) {
                    logger.debug(">>records count = " + records.count());
                    for (ConsumerRecord<String, JsonNode> record : records) {
                        logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());
                        JsonNode jsonMessage = record.value();
                        logger.debug("incoming Message = " + jsonMessage);
                        ObjectMapper objectMapper = new ObjectMapper();
                        MessageVO rawMessage = objectMapper.convertValue(jsonMessage, MessageVO.class);
                        logger.info("Size of the buffer is " + buffer.size());
                        buffer.add(rawMessage);
                    }
                    Date endTime = Calendar.getInstance()
                        .getTime();
                    long durationInMilliSec = endTime.getTime() - startTime.getTime();
                    logger.debug("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
                }
                if ((buffer.size() >= 1000 && buffer.size() <= 3000) || (buffer.size() > 0 && lastSeenZeroPollCounter >= 3000)) {
                    lastSeenZeroPollCounter = 0;
                    List<RawSyslogMessageVO> clonedBuffer = deepCopy(buffer);
                    logger.info("The size of clonedBuffer is ::: " + clonedBuffer.size());
                    writerService.writeRaw(clonedBuffer);
                    buffer.clear();
                }

                consumer.commitSync();
            } catch (Throwable throwable) {
                logger.error("Error occured while processing message", throwable);
                throwable.printStackTrace();
            }
        }
    }

克隆数据的代码以避免数据丢失

 private List<MessageVO> deepCopy(List<MessageVO> messages) {
        List<MessageVO> listOfMessages = new ArrayList<>();
        logger.debug("DeepClone :: listOfMessages size ::: " + listOfMessages.size());
        listOfMessages.addAll(messages);
        return Collections.unmodifiableList(messages);
    }

感谢任何帮助。谢谢。

【问题讨论】:

    标签: java apache-kafka apache-kafka-connect


    【解决方案1】:

    比自己编写更好的方法是使用 Apache Kafka 的 Kafka Connect API — 它专为从系统到 Kafka 以及 Kafka 到其他系统的流式集成而构建 :-)

    Elasticsearch connector 将数据从 Kafka 主题流式传输到 Elasticsearch,具有可配置的批量大小等,以及一次性交付语义、可扩展处理等。

    免责声明:我为 Confluent 工作。

    【讨论】:

      【解决方案2】:

      我们通过稍微简化应用程序的设计来处理相同的用例:我们基本上执行以下步骤

      1. 使用 Spring Kafka BatchAcknowledgingMessageListener 进行获取 根据需求设置max.poll.records的记录
      2. 对于每次提取,使用 Elasticsearch BulkRequest API 提交消息
      3. 批量索引成功后,向 Kafka 确认。
      4. 失败时重试或处理错误

      通过遵循这种更简单的设计,我们意识到大多数批量提交都将具有所需的记录数。对于 Kafka 主题没有与批量索引所需的记录数量一样多的消息的情况,我们决定在任何情况下都对单个 fetch 中可用的任何内容进行索引,而不是在应用程序中显式处理提交状态、管理缓冲区等.

      Elasticsearch 批量提交是一种优化——我找不到任何理由对每个批量请求的记录总数非常精确。 (参见this guide)。

      P.S:我们需要编写代码而不是使用连接器或现成的解决方案,因为我们的输入来自不同格式的多个主题,例如 protobuf、压缩 XML、Json 等,我们需要在索引之前进行格式转换和复杂的反序列化数据

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2012-03-15
        • 1970-01-01
        • 2022-01-06
        • 2010-11-05
        • 1970-01-01
        • 2023-03-26
        • 1970-01-01
        相关资源
        最近更新 更多