【问题标题】:Kafka streams return all records where fieldx = some valueKafka 流返回 fieldx = 某个值的所有记录
【发布时间】:2019-07-03 15:29:41
【问题描述】:

我有多个非唯一字段的记录进入 kafka,我们称它们为 Field1 ... Field n。

我想编写一个查询来返回 fieldx = 某个值的所有记录。让我们举一个简单的例子。假设订单进入系统,订单中的字段之一是 customerId。基本操作是获取特定客户的所有订单。如何使用 Kafka Streams 做到这一点?

我已经有一个 KTable 和所有记录的物化视图,所以我可以遍历视图中的所有记录并挑选出我想要的记录,但这似乎效率低下且成本高昂。

我真的很想创建一个物化视图,其中视图包含按 fieldx 分组的记录,但我看不到任何方法可以做到这一点。看起来您只能将 groupby 与聚合、计数、减少等一起使用。

关于如何做到这一点的任何想法?

【问题讨论】:

  • 您可以先重新映射流以使用您过滤的字段作为其键,这样您就可以在访问商店时查询它。
  • 不确定return all records 是什么意思,但是stream.filter(...).to(...) 呢?过滤器可以检查每条记录是否符合您想要的条件(或者可能是stream.filter(...).foreach(...)?)

标签: java apache-kafka-streams spring-kafka


【解决方案1】:

这是一个按客户 ID 过滤订单的示例。对于这个查询,不需要为分组或聚合创建 KTable。然而,由于 Kafka 主题是没有二级索引的仅附加日志,因此您确实需要遍历所有消息以找到与您的客户 ID 匹配的订单流。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
orderStream.filter((k,v) -> "customer-1".equals(v.customerId));

请注意,上面的代码假设您的订单流也有字符串类型的键,但这些键被忽略了。

另外请注意,您需要指定 Kafka Streams 如何将消息反序列化到您的 Order 类中。您可以使用Consumed.with(...) 指定反序列化程序。

有关完整示例,请参阅 github 上的 Kafka Streams 示例存储库:https://github.com/confluentinc/kafka-streams-examples

并不是说这些类型的查询也可以使用 KSQL 编写:https://www.confluent.io/stream-processing-cookbook/

【讨论】:

  • 这段代码何时/如何运行?我有一个 REST API,它获取对 customerID = 3636 的所有订单的请求。然后我会运行上面的代码吗?这会不止一次吗?
  • 好像我误解了你的问题。如果您收到针对不同客户 ID 的多个请求,那么分组确实是有意义的。然后,您可以使用交互式查询来访问您的状态存储中的数据。
【解决方案2】:

您应该在“customerID”上对您的订单流进行分组,并将所有订单汇总到一个列表中。结果 KTable 将有 &lt;CustomerId, [List of Order]&gt; 类型的事件。

使用交互式查询,您可以查询状态存储,

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
      .groupBy((key,value)-> value .get("customerId"))
      .aggregate(()-> new ArrayList<Order>(),
                 (key,val,agg)-> agg.add(val),
                  Materialized.as("customer-orders")
                  .withValueSerde(ArrayListSerde())          
       ); 

它将创建一个物化视图“customer-orders”,您可以通过rest端点查询。

您可以按照以下链接将 KTables 公开为 Rest Endpoint:

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html

【讨论】:

  • 这种方法的问题可能是,List of Orders 可能会变得太大并超过最大消息大小。
  • 同意,为了处理该问题,OP 需要更改 max.message.bytes 或可能更改用于优化列表的键。我遇到了类似的问题,因此必须制作更多详细信息键以减少列表大小。
  • 我不得不稍微更改答案中的代码以使其正常工作。 1. 我必须为 groupby 添加 Serialized.with 以便它可以正确反序列化订单 2. ()-> new ArrayList() 不起作用。起作用的是 .aggregate(ArrayList::new, (newKey,val,agg) -> { agg.add(val); return agg; },
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-04-16
  • 1970-01-01
  • 2019-05-26
  • 1970-01-01
  • 2016-08-25
  • 2020-05-27
  • 1970-01-01
相关资源
最近更新 更多