【问题标题】:KSQL select delete eventKSQL 选择删除事件
【发布时间】:2020-05-14 00:38:54
【问题描述】:

我有一个包含用户 ID 的 kafka KTable。我将此 Ktable 称为“用户白名单”。此 Ktable 由“用户”主题制成。如果将新消息插入/更新到用户主题中,我会在 Ktable 中获得一条新记录并引发事件。如果我向用户主题发送带有墓碑的密钥,则从 Ktable 中删除记录但我没有收到任何事件给我的消费者。下面是代码,我如何收听 Ktable 事件。

using (var client = new HttpClient())
        {

            client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);

            var request = new HttpRequestMessage(HttpMethod.Post, url);
            request.Method = HttpMethod.Post;
            request.Content = new System.Net.Http.StringContent("{ \"ksql\": \"select * from UserWhitelist;\",\"streamsProperties\": { \"ksql.streams.auto.offset.reset\": \"earliest\"}}", Encoding.UTF8, "application/vnd.ksql.v1+json");
            //request.Content.Headers.Add("Accept", "application/vnd.ksql.v1+json");
            using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
            {
                using (var body = await response.Content.ReadAsStreamAsync())
                using (var reader = new StreamReader(body))
                    while (!reader.EndOfStream)
                        Console.WriteLine(reader.ReadLine());
            }
        }

我是否理解正确,不应该提出删除?如果是这样,您可以针对我的情况建议什么方法,我是否希望有一个白名单,并在 Ktable 上进行任何更新/删除/插入时始终通知每个消费者。

【问题讨论】:

  • 你为什么使用 KSQL REST API 而不是 Confluent dotnet 消费者?
  • 我不确定您是否可以通过消费者订阅 ktable...如果我从主题创建 ktable,我看不到主题...

标签: c# apache-kafka kafka-consumer-api ksqldb


【解决方案1】:

KSQL 选择语句将始终显示表的连续状态。如果您已经打印了一个事件,然后将其删除,那么它仍然会被打印,并且您不会看到任何新的键更新,直到它有一个值添加回表中。

在表本身中,该行将被删除,但获得有关该事件的通知的唯一方法是在表的基础主题上使用实际的 Kafka 消费者,然后检查记录的值何时为空。

您所要求的通常是通过来自 Java KTable 处理器的交互式查询公开辅助 REST API 来完成,并且可以在此处找到 KSQL 票证 - https://github.com/confluentinc/ksql/issues/530

但是,与消耗所有事件然后过滤掉非空事件相比,您仍然需要跟踪查询之间的所有值,然后扫描它们以了解哪些被删除了

【讨论】:

  • 正如我从票务讨论中了解到的,对 Ktables 的时间点查询的支持将很快实现......
猜你喜欢
  • 1970-01-01
  • 2018-05-19
  • 2016-12-07
  • 1970-01-01
  • 2012-08-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多