【发布时间】:2021-08-06 06:44:13
【问题描述】:
我正在尝试通过以下方式使用 kafka 流实现事件溯源模式。
我从事安全服务并处理两个用例:
- 注册用户,处理
RegisterUserCommand应该产生UserRegisteredEvent。 - 更改用户名,处理
ChangeUserNameCommand应该产生UserNameChangedEvent。
我有两个话题:
- 命令主题,
'security-command'。每个命令都是键入的,关键是用户的电子邮件。例如:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
- 事件主题,
'security-event'。每条记录都由用户的电子邮件键入:
foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
foo@bar.com:{"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Kafka Streams version 2.8.0
Kafka version 2.8
实现思路可以用如下拓扑表示:
commandStream = builder.stream("security-command");
eventStream = builder.stream("security-event",
Consumed.with(
...,
new ZeroTimestampExtractor()
/*always returns 0 to get the latest version of snapshot*/));
// build the snapshot to get the current state of the user.
userSnapshots = eventStream.groupByKey()
.aggregate(() -> new UserSnapshot(),
(key /*email*/, event, currentSnapshot) -> currentSnapshot.apply(event));
// join commands with latest snapshot at the time of the join
commandWithSnapshotStream =
commandStream.leftJoin(
userSnapshots,
(command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
joinParams
);
// handle the command given the current snapshot
resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
var newEvents = commandHandler(commandWithSnapshot.command(), commandWithSnapshot.snapshot());
return Arrays.stream(newEvents )
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
});
// append events to events topic
resultingEventStream.to("security-event");
对于这个拓扑,我使用EOS exactly_once_beta。
此拓扑的更明确版本:
KStream<String, Command<DomainEvent[]>> commandStream =
builder.stream(
commandTopic,
Consumed.with(Serdes.String(), new SecurityCommandSerde()));
KStream<String, DomainEvent> eventStream =
builder.stream(
eventTopic,
Consumed.with(
Serdes.String(),
new DomainEventSerde(),
new LatestRecordTimestampExtractor() /*always returns 0 to get the latest snapshot of the snapshot.*/));
// build the snapshots ktable by aggregating all the current events for a given user.
KTable<String, UserSnapshot> userSnapshots =
eventStream.groupByKey()
.aggregate(
() -> new UserSnapshot(),
(email, event, currentSnapshot) -> currentSnapshot.apply(event),
Materialized.with(
Serdes.String(),
new UserSnapshotSerde()));
// join command stream and snapshot table to get the stream of pairs <Command, UserSnapshot>
Joined<String, Command<DomainEvent[]>, UserSnapshot> commandWithSnapshotJoinParams =
Joined.with(
Serdes.String(),
new SecurityCommandSerde(),
new UserSnapshotSerde()
);
KStream<String, CommandWithUserSnapshot> commandWithSnapshotStream =
commandStream.leftJoin(
userSnapshots,
(command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
commandWithSnapshotJoinParams
);
var resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
var command = commandWithSnapshot.command();
if (command instanceof RegisterUserCommand registerUserCommand) {
var handler = new RegisterUserCommandHandler();
var events = handler.handle(registerUserCommand);
// multiple events might be produced when a command is handled.
return Arrays.stream(events)
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
}
if (command instanceof ChangeUserNameCommand changeUserNameCommand) {
var handler = new ChangeUserNameCommandHandler();
var events = handler.handle(changeUserNameCommand, commandWithSnapshot.userSnapshot());
return Arrays.stream(events)
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
}
throw new IllegalArgumentException("...");
});
resultingEventStream.to(eventTopic, Produced.with(Serdes.String(), new DomainEventSerde()));
我遇到的问题:
- 在具有现有记录的命令主题上启动流应用程序:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
Outcome:
1. Stream application fails when processing the ChangeUserNameCommand, because the snapshot is null.
2. The events topic has a record for successful registration, but nothing for changing the name:
/*OK*/foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
Thoughts:
When processing the ChangeUserNameCommand, the snapshot is missing in the aggregated KTable, userSnapshots. Restarting the application succesfully produces the following record:
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Tried increasing the max.task.idle.ms to 4 seconds - no effect.
- 启动流应用程序并一次生成一组 ChangeUserNameCommand 命令(快速)。
Producing:
// Produce to command topic
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
// event topic outcome
/*OK*/ foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
// Produce at once to command topic
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex2"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex3"}}
// event topic outcome
/*OK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
/*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":1}}
/*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":1}}
Thoughts:
'ChangeUserNameCommand' commands are joined with a stale version of snapshot (pay attention to the version attribute).
The expected outcome would be:
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":2}}
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":3}}
Tried increasing the max.task.idle.ms to 4 seconds - no effect, setting the cache_max_bytes_buffering to 0 has no effect.
在构建这样的拓扑时我缺少什么?我希望在最新版本的快照上处理每个命令。如果我生成的命令之间有几秒钟的延迟,那么一切都会按预期进行。
【问题讨论】:
-
查看日志,似乎在从分区中获取执行第一个命令后产生的事件记录之前就开始处理第二个命令。不知道为什么。我使用 max.task.idle.ms 设置。
标签: apache-kafka apache-kafka-streams