【问题标题】:Event Sourcing - Apache Kafka + Kafka Streams - How to assure atomicity / transactionality事件溯源 - Apache Kafka + Kafka Streams - 如何确保原子性/事务性
【发布时间】:2017-12-22 00:03:16
【问题描述】:

我正在使用 Apache Kafka Streams 评估事件溯源,以了解它在复杂场景中的可行性。与关系数据库一样,我遇到过一些情况是原子性/事务性是必不可少的:

具有两种服务的购物应用:

  • OrderService:有一个带有订单的 Kafka Streams 存储 (OrdersStore)
  • ProductService:拥有一个 Kafka Streams 商店 (ProductStockStore),其中包含产品及其库存。

流程:

  1. OrderService 发布 OrderCreated 事件(带有 productId、orderId、userId 信息)

  2. ProductService 获取 OrderCreated 事件并查询其 KafkaStreams 存储 (ProductStockStore) 以检查产品是否有库存。如果有库存,它会发布一个 OrderUpdated 事件(还有 productId、orderId、userId 信息)

关键是这个事件会被 ProductService Kafka Stream 监听,它会处理它以减少库存,到目前为止一切都很好。

但是,想象一下:

  1. 客户 1 下订单,order1(产品有 1 个库存)
  2. 客户 2 为同一产品同时下另一个订单 order2(库存仍为 1)
  3. ProductService 处理 order1 并发送消息 OrderUpdated 以减少库存。此消息位于 order2 -> OrderCreated
  4. 之后的主题中
  5. ProductService 处理 order2-OrderCreated 并发送消息 OrderUpdated 以再次减少库存。这是不正确的,因为它会引入不一致(库存现在应该是 0)。

明显的问题是我们的物化视图(商店)应该在我们处理第一个 OrderUpdated 事件时直接更新。然而,更新 Kafka Stream Store 的唯一方法(我知道)是发布另一个事件(OrderUpdated)以由 Kafka Stream 处理。这样我们就无法以事务方式执行此更新。

我会很感激处理这种情况的想法。

更新:我将尝试澄清问题的问题:

ProductService 有一个 Kafka Streams Store,ProductStock 有这个股票(productId=1, quantity=1)

OrderServiceorders 主题 上发布两个 OrderPlaced 事件:

  • Event1 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")

  • Event2 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")

ProductService 在订单主题上有一个消费者。为简单起见,让我们假设一个单个分区来确保消息按顺序消费。该消费者执行以下逻辑:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic
        orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
    }else{
        //XXX CANCEL ORDER
    }
}

ProductService 还有一个负责更新库存的 Kafka Streams 处理器:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");

Event1 将被首先处理,由于仍有 1 个可用产品,它会生成 ProductReserved 事件。

现在,轮到Event2了。如果 ProductService 消费者ProductService Kafka 流处理器 处理 Event1 生成的 ProductReseved 事件之前使用它,消费者仍会看到 ProductStore product1 的库存为 1,为 Event2 生成 ProductReserved 事件,然后在系统中产生不一致。

【问题讨论】:

    标签: apache-kafka event-sourcing apache-kafka-streams


    【解决方案1】:

    在确保任何分布式系统的一致性方面,同样的问题很常见。通常使用流程管理器/传奇模式,而不是追求强一致性。这有点类似于分布式事务中的两阶段提交,但在应用程序代码中显式实现。它是这样的:

    Order Service 要求 Product Service 保留 N 件商品。产品服务要么接受命令并减少库存,要么在没有足够可用项目时拒绝命令。在对命令做出肯定答复后,Order Service 现在可以发出 OrderCreated 事件(尽管我将其称为 OrderPlaced,因为“放置”听起来是该领域的惯用模式,而“创建”更通用,但这是一个细节)。产品服务要么侦听 OrderPlaced 事件,要么向其发送明确的 ConfirmResevation 命令。或者,如果发生其他事情(例如未能清除资金),可以发出适当的事件或将 CancelReservation 命令明确发送到 ProductService。为了应对特殊情况,ProductService 还可能有一个调度程序(在 KafkaStreams 中,标点符号可以派上用场)来取消在超时期限内未确认或中止的预订。

    两个服务的编排以及处理错误条件和补偿操作(在这种情况下取​​消预订)的技术细节可以在服务中直接处理,或者在显式流程管理器组件中处理以分离此责任。就我个人而言,我会选择一个可以使用 Kafka Streams Processor API 实现的显式流程管理器。

    【讨论】:

    • 感谢您的详尽解释。如果我做对了,Kafka Streams Processor API 允许我们写入远程存储,不是吗?因此,您建议使用另一个服务 ProcessManagerService,该服务将使用处理器 API 与其他服务的存储进行交互,我的疑问是:服务产生的所有消息是否必须由该管理器处理,或者只处理那些必要的消息传奇提交/回滚?如果您对此有任何进一步的阅读,那就太好了。
    • 流程管理器不会直接与远程存储交互,但可以通过 Kafka 发送适当的消息,并且相应的服务会将它们作为合同的一部分进行处理。流程管理器可以是它自己的服务,但不是必须的。它可能只是开始流程的服务的一部分,在本例中是订单服务。这取决于您是希望整体解决方案更简洁还是更解耦。
    • 嗨,Michal,经过一段时间权衡您的答案后,我一直在努力解决这个问题,所以我不接受这个答案。让我证明一下:我认为您使用 Saga 模式的建议是完全正确的,但实现仍然不清楚。您说:The Order Service asks the Product Service to reserve N items. The Product Service either accepts the command and reduces stock or rejects the command if it doesn't have enough items available. 这是有问题的部分:ProductService 必须检查库存以接受/拒绝命令。两个并发命令将检查库存..
    • ...如果库存中有 1 个产品,两个并发的保留产品命令可以成功。我正在更新问题以更清楚地描述场景。
    • 如果你使用 Kafka Streams 来实现 Product Service 就不会出现这个问题。您只需要按产品 ID 对这些消息进行分区。分区内的消息处理是顺序的。保留相同产品的所有请求将转到相同的任务,它将处理消息并按顺序更新其状态存储。从 Kafka v0.11 开始,您甚至可以启用 Exactly-once-Semantics 以事务方式完成此操作,以避免在失败后重试的情况下重复保留。否则你必须确保你的状态存储更新是幂等的。
    【解决方案2】:

    这个答案对于您的原始问题来说有点晚了,但为了完整起见,还是让我回答一下。

    有很多方法可以解决这个问题,但我鼓励解决这个问题是一种事件驱动的方式。这意味着您 (a) 验证有足够的库存来处理订单并 (b) 将库存保留为单个,所有这些都在单个 KStreams 操作中。诀窍是通过 productId 重新设置密钥,这样您就知道同一产品的订单将在同一线程上按顺序执行(因此您不会遇到 Order1 和 Order2 两次保留同一产品的库存的情况)。

    有一篇文章讨论了如何做到这一点:https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

    也许更有用的是一些示例代码也显示了它是如何完成的: https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

    请注意,在此 KStreams 代码中,第一行如何将第一行重新设置为 productId,然后使用 Transformer (a) 验证是否有足够的库存来处理订单 (b) 保留所需的库存更新状态存储。这是使用 Kafka 的事务功能以原子方式完成的。

    【讨论】:

    • 嗨,Ben,感谢您的回答,我终于有时间再处理这个问题了。它工作正常,但有一点我不明白:KStream 是从订单主题创建的,那么为什么选择新键(产品)会更改处理为产品的元素的顺序?我的意思是原来的来源是一个订单主题分区,怎么现在又重新分配给产品了?有没有关于这种行为的文档?
    猜你喜欢
    • 2022-08-19
    • 2019-02-01
    • 2019-09-16
    • 2019-02-03
    • 2016-06-02
    • 2021-10-01
    • 1970-01-01
    • 2019-06-13
    • 2016-06-16
    相关资源
    最近更新 更多