【问题标题】:Stale ktable records when joining kstream with ktable created by kstream aggregation将 kstream 与由 kstream 聚合创建的 ktable 连接时过时的 ktable 记录
【发布时间】:2021-08-06 06:44:13
【问题描述】:

我正在尝试通过以下方式使用 kafka 流实现事件溯源模式。

我从事安全服务并处理两个用例:

  1. 注册用户,处理RegisterUserCommand应该产生UserRegisteredEvent
  2. 更改用户名,处理ChangeUserNameCommand 应该产生UserNameChangedEvent

我有两个话题:

  1. 命令主题,'security-command'。每个命令都是键入的,关键是用户的电子邮件。例如:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
  1. 事件主题,'security-event'。每条记录都由用户的电子邮件键入:
foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex",  "version":0}}
foo@bar.com:{"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Kafka Streams version 2.8.0
Kafka version 2.8

实现思路可以用如下拓扑表示:

    commandStream = builder.stream("security-command");
    eventStream = builder.stream("security-event",
                                    Consumed.with(
                                        ...,
                                        new ZeroTimestampExtractor()
                                        /*always returns 0 to get the latest version of snapshot*/));
    
    // build the snapshot to get the current state of the user.
    userSnapshots = eventStream.groupByKey()
                                .aggregate(() -> new UserSnapshot(),
                                     (key /*email*/, event, currentSnapshot) -> currentSnapshot.apply(event));
    
    // join commands with latest snapshot at the time of the join
    commandWithSnapshotStream =
                commandStream.leftJoin(
                        userSnapshots,
                        (command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
                        joinParams
                );
        
    // handle the command given the current snapshot
    resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
            var newEvents = commandHandler(commandWithSnapshot.command(), commandWithSnapshot.snapshot());

            return Arrays.stream(newEvents )
                         .map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
                         .toList();
            });

    // append events to events topic 
    resultingEventStream.to("security-event");

对于这个拓扑,我使用EOS exactly_once_beta

此拓扑的更明确版本:

       KStream<String, Command<DomainEvent[]>> commandStream =
                builder.stream(
                        commandTopic,
                            Consumed.with(Serdes.String(), new SecurityCommandSerde()));

        KStream<String, DomainEvent> eventStream =
                builder.stream(
                        eventTopic,
                        Consumed.with(
                            Serdes.String(),
                            new DomainEventSerde(),
                            new LatestRecordTimestampExtractor() /*always returns 0 to get the latest snapshot of the snapshot.*/));

        // build the snapshots ktable by aggregating all the current events for a given user.
        KTable<String, UserSnapshot> userSnapshots =
                                        eventStream.groupByKey()
                                                   .aggregate(
                                                           () -> new UserSnapshot(),
                                                           (email, event, currentSnapshot) ->   currentSnapshot.apply(event),
                                                           Materialized.with(
                                                                    Serdes.String(),
                                                                    new UserSnapshotSerde()));

        // join command stream and snapshot table to get the stream of pairs <Command, UserSnapshot>
        Joined<String, Command<DomainEvent[]>, UserSnapshot> commandWithSnapshotJoinParams =
                Joined.with(
                        Serdes.String(),
                        new SecurityCommandSerde(),
                        new UserSnapshotSerde()
                );

        KStream<String, CommandWithUserSnapshot> commandWithSnapshotStream =
                commandStream.leftJoin(
                        userSnapshots,
                        (command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
                        commandWithSnapshotJoinParams
                );

        var resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {

            var command = commandWithSnapshot.command();

            if (command instanceof RegisterUserCommand registerUserCommand) {
                var handler = new RegisterUserCommandHandler();
                var events = handler.handle(registerUserCommand);

                // multiple events might be produced when a command is handled.
                return Arrays.stream(events)
                             .map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
                             .toList();
            }

            if (command instanceof ChangeUserNameCommand changeUserNameCommand) {
                var handler = new ChangeUserNameCommandHandler();
                var events = handler.handle(changeUserNameCommand, commandWithSnapshot.userSnapshot());

                return Arrays.stream(events)
                             .map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
                             .toList();
            }

            throw new IllegalArgumentException("...");
        });

        resultingEventStream.to(eventTopic, Produced.with(Serdes.String(), new DomainEventSerde()));

我遇到的问题:

  1. 在具有现有记录的命令主题上启动流应用程序:
   foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}

Outcome:
   1. Stream application fails when processing the ChangeUserNameCommand, because the snapshot is null.
   2. The events topic has a record for successful registration, but nothing for changing the name:
   /*OK*/foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex",  "version":0}}
   
Thoughts:
   When processing the ChangeUserNameCommand, the snapshot is missing in the aggregated KTable, userSnapshots. Restarting the application succesfully produces the following record:
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
   
   Tried increasing the max.task.idle.ms to 4 seconds - no effect.
  1. 启动流应用程序并一次生成一组 ChangeUserNameCommand 命令(快速)。
Producing:
   
   // Produce to command topic
   foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
   
   // event topic outcome
   /*OK*/ foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex",  "version":0}}
   
   // Produce at once to command topic
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex2"}}
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex3"}}
   
   // event topic outcome
   /*OK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
   /*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":1}}
   /*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":1}}
   
Thoughts:
   'ChangeUserNameCommand' commands are joined with a stale version of snapshot (pay attention to the version attribute).
   The expected outcome would be:
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":2}}
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":3}}

   Tried increasing the max.task.idle.ms to 4 seconds - no effect, setting the cache_max_bytes_buffering to 0 has no effect.

在构建这样的拓扑时我缺少什么?我希望在最新版本的快照上处理每个命令。如果我生成的命令之间有几秒钟的延迟,那么一切都会按预期进行。

【问题讨论】:

  • 查看日志,似乎在从分区中获取执行第一个命令后产生的事件记录之前就开始处理第二个命令。不知道为什么。我使用 max.task.idle.ms 设置。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

我认为您错过了表的更改日志恢复部分。阅读 this 以了解更改日志恢复会发生什么。

对于表,它更复杂,因为它们必须维护额外的 信息——他们的状态——允许有状态的处理,比如连接 以及像 COUNT() 或 SUM() 这样的聚合。实现这一点的同时也 确保高处理性能,表(通过它们的状态 商店)在 Kafka Streams 内的本地磁盘上实现 应用程序实例或 ksqlDB 服务器。但是机器和容器 可能会丢失,以及任何本地存储的数据。我们怎样才能使 表也​​可以容错?

答案是存储在表中的任何数据也远程存储 在卡夫卡。为此,每个表都有自己的更改流—— 我们可以说,内置变更数据捕获 (CDC) 设置。所以如果我们有 按客户的账户余额表,每次账户余额 被更新,相应的更改事件将被记录到 更改该表的流。

另外请记住,重新启动 Kafka 流应用程序不应处理以前处理的事件。为此,您需要在处理后提交消息的偏移量。

【讨论】:

    【解决方案2】:

    找到根本原因。不确定这是设计使然还是错误,但流任务在每个处理周期中只会等待一次其他分区中的数据。 因此,如果首先读取来自命令主题的 2 条记录,则流任务将等待max.task.idle.ms,从而在处理第一条命令记录时允许 poll() 阶段发生。处理完后,在处理第二个时,流任务将不允许轮询获取第一个命令处理产生的新生成的事件。

    在 kafka 2.8 中,负责此行为的代码位于 StreamTask.java 中。 IsProcessable() 在处理阶段开始时被调用。如果它返回 false,这将导致重复轮询阶段。

       public boolean isProcessable(final long wallClockTime) {
            if (state() == State.CLOSED) {
                return false;
            }
    
            if (hasPendingTxCommit) {
                return false;
            }
    
            if (partitionGroup.allPartitionsBuffered()) {
                idleStartTimeMs = RecordQueue.UNKNOWN;
                return true;
            } else if (partitionGroup.numBuffered() > 0) {
                if (idleStartTimeMs == RecordQueue.UNKNOWN) {
                    idleStartTimeMs = wallClockTime;
                }
    
                if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
                    return true;
    
                    // idleStartTimeMs is not reset to default, RecordQueue.UNKNOWN, value, 
                    // therefore the next time when the check for all buffered partitions is done, `true` is returned, meaning that the task is ready to be processed.  
                } else {
                    return false;
                }
            } else {
                // there's no data in any of the topics; we should reset the enforced
                // processing timer
                idleStartTimeMs = RecordQueue.UNKNOWN;
                return false;
            }
        }
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-28
      • 1970-01-01
      • 2020-10-17
      • 1970-01-01
      相关资源
      最近更新 更多