【发布时间】:2017-12-22 00:03:16
【问题描述】:
我正在使用 Apache Kafka Streams 评估事件溯源,以了解它在复杂场景中的可行性。与关系数据库一样,我遇到过一些情况是原子性/事务性是必不可少的:
具有两种服务的购物应用:
- OrderService:有一个带有订单的 Kafka Streams 存储 (OrdersStore)
- ProductService:拥有一个 Kafka Streams 商店 (ProductStockStore),其中包含产品及其库存。
流程:
OrderService 发布 OrderCreated 事件(带有 productId、orderId、userId 信息)
ProductService 获取 OrderCreated 事件并查询其 KafkaStreams 存储 (ProductStockStore) 以检查产品是否有库存。如果有库存,它会发布一个 OrderUpdated 事件(还有 productId、orderId、userId 信息)
关键是这个事件会被 ProductService Kafka Stream 监听,它会处理它以减少库存,到目前为止一切都很好。
但是,想象一下:
- 客户 1 下订单,order1(产品有 1 个库存)
- 客户 2 为同一产品同时下另一个订单 order2(库存仍为 1)
- ProductService 处理 order1 并发送消息 OrderUpdated 以减少库存。此消息位于 order2 -> OrderCreated 之后的主题中
- ProductService 处理 order2-OrderCreated 并发送消息 OrderUpdated 以再次减少库存。这是不正确的,因为它会引入不一致(库存现在应该是 0)。
明显的问题是我们的物化视图(商店)应该在我们处理第一个 OrderUpdated 事件时直接更新。然而,更新 Kafka Stream Store 的唯一方法(我知道)是发布另一个事件(OrderUpdated)以由 Kafka Stream 处理。这样我们就无法以事务方式执行此更新。
我会很感激处理这种情况的想法。
更新:我将尝试澄清问题的问题:
ProductService 有一个 Kafka Streams Store,ProductStock 有这个股票(productId=1, quantity=1)
OrderService 在 orders 主题 上发布两个 OrderPlaced 事件:
Event1 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")Event2 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")
ProductService 在订单主题上有一个消费者。为简单起见,让我们假设一个单个分区来确保消息按顺序消费。该消费者执行以下逻辑:
if("OrderPlaced".equals(event.get("eventType"))){
Order order = new Order();
order.setId((String)event.get("orderId"));
order.setProductId((Integer)(event.get("productId")));
order.setUid(event.get("uid").toString());
// QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
Integer productStock = getProductStock(order.getProductId());
if(productStock > 0) {
Map<String, Object> event = new HashMap<>();
event.put("name", "ProductReserved");
event.put("orderId", order.getId());
event.put("productId", order.getProductId());
// WRITES A PRODUCT RESERVED EVENT TO orders topic
orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
}else{
//XXX CANCEL ORDER
}
}
ProductService 还有一个负责更新库存的 Kafka Streams 处理器:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");
Event1 将被首先处理,由于仍有 1 个可用产品,它会生成 ProductReserved 事件。
现在,轮到Event2了。如果 ProductService 消费者 在 ProductService Kafka 流处理器 处理 Event1 生成的 ProductReseved 事件之前使用它,消费者仍会看到 ProductStore product1 的库存为 1,为 Event2 生成 ProductReserved 事件,然后在系统中产生不一致。
【问题讨论】:
标签: apache-kafka event-sourcing apache-kafka-streams