【问题标题】:How to lookup dynamically in a KTable?如何在 KTable 中动态查找?
【发布时间】:2020-08-10 15:56:58
【问题描述】:

我目前正在开发一个 Kafka Streams 应用程序,该应用程序使用我们数据库中的数据丰富传入事件。丰富的数据存储在使用 Debezium 不断更新的主题中。 一些丰富很容易实现,因为它们只是来自事件 id 的 equi-join/left-join。 但其他丰富需要从传入的事件时间戳计算一个值:

假设我的传入事件主题具有此架构:

user_id: Long
timestamp: Instant

然后我需要将此事件映射到以下输出:

user_id: Long
has_planned_meetings_in_the_future: Boolean

会议表存储在单独的主题中,具有以下记录结构:

user_id: Long
meeting_date: Instant

因此,对于每个事件,如果它们是此特定用户的记录并且会议日期大于当前时间戳,我将需要在会议主题中查找。

怎么做?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    一种可行的方法是使用应用程序中的会议主题并将会议存储在状态存储中。

    然后,您可以使用您描述的条件有效地查询状态存储。

    这是一个存储会议的简单示例:

    public class MyMeetingsProcessor implements Processor<Object, Meeting> {
    
        private String meetingsKeyStore = "meetings-key-store";
        private KeyValueStore<Object, Meeting> meetings;
    
        public void init(ProcessorContext context) {
            meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
        }
    
        public void process(Object key, Meeting value) {
            meetings.put(key, value);
        }
    }
    

    要在消费事件时查询状态存储,您可以这样做:

    public class MyEventsProcessor implements Processor<Object, Meeting> {
    
        private String meetingsKeyStore = "meetings-key-store";
        private KeyValueStore<Object, Meeting> meetings;
    
        public void init(ProcessorContext context) {
            meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
        }
    
        public void process(Object key, Event value) {
            Meeting meeting = meetings.get(key);
            if (meeting != null) {
                // do something fun
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多