【问题标题】:Stop processing kafka messages if something goes wrong during process如果在处理过程中出现问题,请停止处理 kafka 消息
【发布时间】:2020-04-08 12:15:30
【问题描述】:

在我的处理器 API 中,我将消息存储在键值存储中,每 100 条消息我发出一个 POST 请求。如果在尝试发送消息时出现问题(api 没有响应等),我想停止处理消息。在有证据证明 API 调用有效之前。 这是我的代码:

public class BulkProcessor implements Processor<byte[], UserEvent> {

    private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;

    private BulkAPIClient bulkClient;

    private String storeName;

    private ProcessorContext context;

    private int count;

    @Autowired
    public BulkProcessor(String storeName, BulkClient bulkClient) {
        this.storeName = storeName;
        this.bulkClient = bulkClient;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
        count = 0;
        // to check every 15 minutes if there are any remainders in the store that are not sent yet
        this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if (count > 0) {
                sendEntriesFromStore();
            }
        });
    }

    @Override
    public void process(byte[] key, UserEvent value) {
        int userGroupId = Integer.valueOf(value.getUserGroupId());
        ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
        if (userEventArrayList == null) {
            userEventArrayList = new ArrayList<>();
        }
        userEventArrayList.add(value);
        keyValueStore.put(userGroupId, userEventArrayList);
        if (count == 100) {
            sendEntriesFromStore();
        }
    }

    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                try {
                    bulkClient.postRequestBulkApi(url, bulkRequest);
                    keyValueStore.delete(entry.key);
                } catch (BulkApiException e) {
                    logger.warn(e.getMessage(), e.fillInStackTrace());
                }
            }
        }
        iterator.close();
        count = 0;
    }

    @Override
    public void close() {
    }
}

目前在我的代码中,如果对 API 的调用失败,它将迭代下一个 100(只要它失败,这种情况就会一直发生)并将它们添加到 keyValueStore。我不希望这种情况发生。相反,我宁愿停止流并在 keyValueStore 清空后继续。这可能吗?
我可以扔一个StreamsException吗?

try {
    bulkClient.postRequestBulkApi(url, bulkRequest);
    keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
    throw new StreamsException(e);
}

这会杀死我的流应用程序并因此进程终止吗?

【问题讨论】:

  • 收到新消息后计数增加了吗?
  • @TuyenLuong count 不会增加,因为在 sendEntriesFromStore 方法中我将其重置为 0。如果我的 POST 请求中的某些内容不起作用,我只是不希望将 +100 事件添加到 keyValueStore

标签: apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:
  1. 只有在确保 API 成功处理您的记录后,您才应该从状态存储中删除记录,因此请删除第一个 keyValueStore.delete(entry.key); 并保留第二个。如果没有,那么当keyValueStore.delete 提交到底层变更日志主题但您的消息尚未成功处理时,您可能会丢失一些消息,因此最多只能保证一个。
  2. 只需将调用 API 代码包裹在一个无限循环中并继续尝试直到记录成功处理,您的处理器将不会消耗来自上述处理器节点的新消息,因为它在同一个 StreamThread 中运行:
    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            //remove this state store delete code : keyValueStore.delete(entry.key);
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                while (true) {
                    try {
                        bulkClient.postRequestBulkApi(url, bulkRequest);
                        keyValueStore.delete(entry.key);//only delete after successfully process the message to achieve at least one processing guarantee
                        break;
                    } catch (BulkApiException e) {
                        logger.warn(e.getMessage(), e.fillInStackTrace());
                    }
                }
            }
        }
        iterator.close();
        count = 0;
    }
  1. 是的,您可以抛出 StreamsException,此StreamTask 将在重新平衡期间迁移到另一个 StreamThread,可能在示例应用程序实例上。如果 API 一直导致 Exception 直到所有 StreamThread 都死了,您的应用程序将不会自动退出并接收到 Exception,您应该添加一个自定义 StreamsException 处理程序以在所有流线程都死时使用 KafkaStreams#setUncaughtExceptionHandler 退出您的应用程序或监听 Stream State 更改(到 ERROR 状态):
All stream threads have died. The instance will be in error state and should be closed.

【讨论】:

  • 是的,很抱歉第一次删除是复制粘贴的错误,来自旧代码状态,与新代码混合。第一个删除实际上不再在代码中,这就是我在 try catch 中添加它的原因。至于while循环,我更希望将其切换到死态并重新启动。
  • 只是在无限循环中重试会“停止”处理,但是,您的线程最终会退出消费者组,因为不会调用 poll()。您可以增加max.poll.interval.ms,但将其设置得更大会产生其他(可能是不希望的)副作用。最后,Kafka Streams atm 没有很好的支持“停止”处理。 ——它或多或少地定期出现。因此,我认为我们应该在 Kafka Streams 中添加内置支持来支持这一点。
【解决方案2】:

最后我使用了一个简单的KafkaConsumer 而不是KafkaStreams,但底线是我将BulkApiException 更改为扩展RuntimeException,我在登录后再次抛出。所以现在看起来如下:

        } catch (BulkApiException bae) {
            logger.error(bae.getMessage(), bae.fillInStackTrace());
            throw new BulkApiException();
        } finally {
            consumer.close();
            int exitCode = SpringApplication.exit(ctx, () -> 1);
            System.exit(exitCode);
        }

这样退出应用,k8s重启pod。那是因为如果我尝试转发请求的 api 已关闭,那么继续阅读消息就没有意义了。因此,在其他 api 备份之前,k8s 将重新启动一个 pod。

【讨论】:

    猜你喜欢
    • 2021-12-05
    • 2023-02-06
    • 2023-03-30
    • 2018-07-30
    • 1970-01-01
    • 1970-01-01
    • 2019-06-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多