【发布时间】:2017-09-14 13:37:21
【问题描述】:
我对以下拓扑的行为有一些疑问:
String topic = config.topic();
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
.peek((k, v) -> L.info("Event:"+v.action))
// join the event with the according entry in the KTable and apply the state mutation
.leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
.peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
我的问题发生在我同时收到不同的事件时。因为我的状态突变是由leftJoin 完成的,然后由to 方法编写。如果使用相同的密钥同时收到事件 1 和 2,我可能会发生以下情况:
event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
因此,状态 Y 没有从 event1 发生的变化,所以我丢失了数据。
以下是我所看到的日志(Processing:... 部分是从值连接器内部记录的):
Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Event1可以认为是创建事件:它将在KTable中创建条目,因此状态是否为空无关紧要。 Event2 虽然需要将其更改应用于现有状态,但它没有找到任何更改,因为第一个状态突变仍未写入 KTable(它仍未被 to 方法处理)
有没有办法确保我的 leftJoin 和我对 ktable 的写入是原子完成的?
谢谢
更新和当前解决方案
感谢@Matthias 的回复,我能够使用Transformer 找到解决方案。
代码如下:
那是变压器
public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {
private final String stateName;
private final ValueJoiner<V1, V2, V2> joiner;
private final boolean updateState;
private KeyValueStore<K, V2> state;
public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
this.stateName = stateName;
this.joiner = joiner;
this.updateState = updateState;
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
}
@Override
public KeyValue<K, V2> transform(K key, V1 value) {
V2 stateValue = this.state.get(key); // Get current state
V2 updatedValue = joiner.apply(value, stateValue); // Apply join
if (updateState) {
this.state.put(key, updatedValue); // write new state
}
return new KeyValue<>(key, updatedValue);
}
@Override
public KeyValue<K, V2> punctuate(long timestamp) {
return null;
}
@Override
public void close() {}
}
这是调整后的拓扑:
String topic = config.topic();
String store = topic + "-store";
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
// join the event with the according entry in the KTable and apply the state mutation
.transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
当我们使用 KTable 的 KV StateStore 并通过 put 方法直接在其中应用更改时,事件应该始终获取更新的状态。
我仍然想知道一件事:如果我有持续的高吞吐量事件怎么办。
在我们在 KTable 的 KV 存储中执行的 put 和在 KTable 的 topic 中执行的写入之间是否仍然存在竞争条件?
【问题讨论】: