【问题标题】:KStream-KTable join writing to the KTable: How to sync the join with the ktable write?KStream-KTable join 写入 KTable:如何将 join 与 ktable 写入同步?
【发布时间】:2017-09-14 13:37:21
【问题描述】:

我对以下拓扑的行为有一些疑问:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

我的问题发生在我同时收到不同的事件时。因为我的状态突变是由leftJoin 完成的,然后由to 方法编写。如果使用相同的密钥同时收到事件 1 和 2,我可能会发生以下情况:

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

因此,状态 Y 没有从 event1 发生的变化,所以我丢失了数据。

以下是我所看到的日志(Processing:... 部分是从值连接器内部记录的):

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1

Event1可以认为是创建事件:它将在KTable中创建条目,因此状态是否为空无关紧要。 Event2 虽然需要将其更改应用于现有状态,但它没有找到任何更改,因为第一个状态突变仍未写入 KTable(它仍未被 to 方法处理)

有没有办法确保我的 leftJoin 和我对 ktable 的写入是原子完成的?

谢谢

更新和当前解决方案

感谢@Matthias 的回复,我能够使用Transformer 找到解决方案。

代码如下:

那是变压器

public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

    private final String                    stateName;
    private final ValueJoiner<V1, V2, V2>   joiner;
    private final boolean                   updateState;

    private KeyValueStore<K, V2>            state;

    public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
        this.stateName = stateName;
        this.joiner = joiner;
        this.updateState = updateState;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
    }

    @Override
    public KeyValue<K, V2> transform(K key, V1 value) {
        V2 stateValue = this.state.get(key); // Get current state
        V2 updatedValue = joiner.apply(value, stateValue); // Apply join
        if (updateState) {
            this.state.put(key, updatedValue); // write new state
        }
        return new KeyValue<>(key, updatedValue);
    }

    @Override
    public KeyValue<K, V2> punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {}
}

这是调整后的拓扑:

String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    // join the event with the according entry in the KTable and apply the state mutation
    .transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我们使用 KTable 的 KV StateStore 并通过 put 方法直接在其中应用更改时,事件应该始终获取更新的状态。 我仍然想知道一件事:如果我有持续的高吞吐量事件怎么办。

在我们在 KTable 的 KV 存储中执行的 put 和在 KTable 的 topic 中执行的写入之间是否仍然存在竞争条件?

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    一个KTable被分片到多个物理存储中,每个存储仅由一个线程更新。因此,您描述的情况不会发生。如果您有 2 条具有相同时间戳的记录都更新同一个分片,则它们将一个接一个地处理(按偏移顺序)。因此,第二次更新会看到第一次更新后的状态。

    所以也许你只是没有正确描述你的场景?

    更新

    在进行连接时不能改变状态。因此,期望

    event1 joins with state A => state A mutated to state X
    

    错了。与任何处理顺序无关,event1state A连接时,会以只读方式访问state Astate A不会被修改。

    因此,当event2 加入时,它将看到与event1 相同的状态。对于流表连接,只有在从 table-input-topic 读取新数据时才会更新表状态。

    如果您希望从两个输入中更新共享状态,则需要使用 transform() 构建自定义解决方案:

    builder.addStore(..., "store-name");
    builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
    KStream result = builder.stream("stream-topic").transform(..., "store-name");
    

    这将创建一个由两个处理器共享的存储,并且两者都可以根据需要进行读/写。因此,对于表输入,您可以只更新状态而不向下游发送任何内容,而对于流输入,您可以进行连接、更新状态并向下游发送结果。

    更新 2

    关于解决方案,Transformer 应用于状态的更新和状态更新后记录Transformer 进程之间不会存在竞争条件。这部分将在单个线程中执行,并且记录将从输入主题按偏移顺序处理。因此,可以确保以后的记录可以使用状态更新。

    【讨论】:

    • 我添加了一些日志来说明我的问题。也许它更容易理解?
    • 我不确定我是否理解日志。你得到两种类型的事件?一些用于填充表格,另一个用于流(如果是,哪些是)?你在哪里登录Updated-null-null 是什么意思?另请注意,在流表连接中,表状态仅从“表输入主题”更新,流仅从表中读取(它从不更新表)。
    • 我更新了我的问题,希望它能让事情更清楚。更新的部分在 leftJoin 之后立即记录。是的,我得到了多种类型的事件。一切都以事件源方式完成。
    • 更新了我的答案。
    • 我所说的变异是我的ValueJoiner负责编译新的状态。我知道它不会立即对其进行变异,但这实际上是我想做的:以原子方式从/向我的 ktable 读取-变异-写入。据我了解,您正在创建第三种状态,但我需要数据以table-topic 结尾。所以也许我需要的只是builder.addStore(..., "table-topic"); builder.stream("stream-topic").transform(..., "table-topic"); 变压器访问table-topic 来读取它?那有意义吗 ?我不熟悉转换/处理 API
    猜你喜欢
    • 2018-02-23
    • 2020-10-17
    • 1970-01-01
    • 2020-08-11
    • 1970-01-01
    • 2020-01-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多