Kafka Streams 提供 2 种语义:emit-on-update 和 emit-on-window-close。
KIP-557 是关于基于数据的字节数组比较添加emit-on-change语义。它已经在 Kafka Streams 2.6 和 removed due to "potential data loss" 中实现。
不过,我使用 Kafka Streams DSL 开发了一个变化时发出语义的实现。
我们的想法是将具有 emit-on-update 语义的 KStream 转换为具有 emit-on-change 语义的 KStream。您可以在您提供的源 Kstream 上使用此实现来创建 KTable,也可以在应用 .toStream() 后在 KTable 上使用此实现。
这个实现隐式地创建了一个状态存储,其中的值包含 KStream 数据和一个标志,指示是否应该发出更新。此标志在聚合操作中设置,并基于Object#equals 进行比较。但您可以更改实现以使用Comparator。
这是改变 KStream 语义的withEmitOnChange 方法。您可能必须为 EmitOnChangeState 数据结构指定一个 serde(见下文)。
public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
return streams
.groupByKey()
.aggregate(
() -> (EmitOnChangeState<V>) null,
(k, data, state) -> {
if (state == null) {
return new EmitOnChangeState<>(data, true);
} else {
return state.merge(data);
}
}
)
.toStream()
.filter((k, state) -> state.shouldEmit)
.mapValues(state -> (V) state.data);
}
这是存储在状态存储中的数据结构,用于检查是否应该发出更新。
public static class EmitOnChangeState<T> {
public final T data;
public final boolean shouldEmit;
public EmitOnChangeState(T data, boolean shouldEmit) {
this.data = data;
this.shouldEmit = shouldEmit;
}
public EmitOnChangeState<T> merge(T newData) {
return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(data, shouldEmit);
}
}
用法:
KStream<ProductKey, Product> products = builder.stream("product-topic");
withEmitOnChange(products)
.to("out-product-topic"); // output topic with emit-on-change semantic