【问题标题】:Replacing a scheduled task with Spring Events用 Spring Events 替换计划任务
【发布时间】:2021-04-02 06:14:21
【问题描述】:

在我的 Spring Boot 应用中,客户可以提交文件。每个客户的文件都通过每分钟运行的计划任务合并在一起。合并由调度程序执行的事实有许多缺点,例如编写端到端测试很困难,因为在测试中您必须等待调度程序运行才能检索合并结果。

因此,我想改用event-based approach,即

  1. 客户提交文件
  2. 已发布包含此客户 ID 的事件
  3. 合并服务侦听这些事件并在事件对象中为客户执行合并操作

这样做的好处是在有文件可合并后立即触发合并操作。

但是,这种方法存在许多问题,我希望得到一些帮助

并发

合并是一项相当昂贵的操作。最多可能需要 20 秒,具体取决于所涉及的文件数量。因此,合并必须异步发生,即不作为发布合并事件的同一线程的一部分。另外,我不想同时为同一个客户执行多个合并操作,以避免出现以下情况

  1. 客户 1 保存文件 2 触发文件 1 和文件 2 的合并操作 2
  2. 不久之后,客户 1 保存了文件 3,触发了文件 1、文件 2 和文件 3 的合并操作 3
  3. 合并操作3完成保存合并文件3
  4. Merge operation2 完成用 merge-file2 覆盖 merge-file3

为避免这种情况,我计划使用事件侦听器中的锁按顺序处理同一客户的合并操作,例如

@Component
public class MergeEventListener implements ApplicationListener<MergeEvent> {

    private final ConcurrentMap<String, Lock> customerLocks = new ConcurrentHashMap<>();

    @Override
    public void onApplicationEvent(MergeEvent event) {
        var customerId = event.getCustomerId();
        var customerLock = customerLocks.computeIfAbsent(customerId, key -> new ReentrantLock());
        customerLock.lock();
        mergeFileForCustomer(customerId);
        customerLock.unlock();
    }

    private void mergeFileForCustomer(String customerId) {
        // implementation omitted
    }
}

容错

如果应用程序在合并操作过程中关闭或在合并操作期间发生错误,我该如何恢复?

计划方法的优点之一是它包含隐式重试机制,因为每次运行时它都会查找具有未合并文件的客户。

总结

我怀疑我提出的解决方案可能会(严重)重新实施解决此类问题的现有技术,例如JMS。我建议的解决方案是可取的,还是应该改用 JMS 之类的东西?该应用程序托管在 Azure 上,因此我可以使用它提供的任何服务。

如果我的解决方案可取的,我应该如何处理容错?

【问题讨论】:

  • 看看 temporal.io,它是一个分布式编排平台,可以轻松支持此类用例。
  • 如果您仍然使用 azure,您可以简单地使用主题来传播您的消费者订阅的事件。在生产者方面,您可以将事件作为正常实体持久化,因此在任何故障情况下,您都可以通过获取未标记为在您的消费者中处理的事件轻松恢复。显然,这并不能解决您的并发要求。但我确实看到你的方法存在问题,如果你打算异步执行并且顺序不是不重要的。确实很重要。

标签: java spring-boot events architecture scheduled-tasks


【解决方案1】:

关于并发部分,如果每个客户(在给定时间范围内)提交的文件数量足够少,我认为使用锁的方法可以正常工作。

随着时间的推移,您最终可以监控等待锁的线程数,以查看是否存在大量争用。如果有,那么也许您可以累积一些合并事件(在特定时间范围内),然后运行并行合并操作,这实际上会导致类似于使用调度程序的解决方案。

在容错方面,基于消息队列的方法可以工作(没有使用 JMS,但我看到它是消息队列的实现)。

出于可靠性目的,我会使用基于云的消息队列(例如SQS)。方法是:

  • 将合并事件推送到队列中
  • 合并服务一次扫描一个事件并启动合并作业
  • 合并作业完成后,消息将从队列中删除

这样,如果在合并过程中出现问题,消息会留在队列中,并在应用重启时再次读取。

【讨论】:

    【解决方案2】:

    使用 Kafka 的 Spring-boot 可以解决您的容错问题。

    Kafka 支持生产者-消费者模型。让客户事件发布到 Kafka 生产者。

    为 Kafka 配置复制功能,以免丢失任何事件。

    使用可以为每个事件调用合并服务的消费者。

    1. 一旦消费者读取 customerId 的事件并合并然后提交偏移量。

    2. 如果在合并事件之间发生任何故障,则不会提交偏移量,以便在应用程序再次启动时可以再次读取。

    3. 如果合并服务可以检测到给定数据的重复事件,那么重新处理相同的消息应该不会导致任何问题(Kafka 承诺事件的单一传递)。重复事件检测是对已处理完毕但未能提交到 Kafka 的事件的安全检查。

    【讨论】:

      【解决方案3】:

      首先,基于事件的方法对于这种情况是正确的。您应该为发布-订阅事件消息使用外部代理。

      注意,默认情况下,Spring 发布事件是同步的

      假设,您有 3 个服务:

      1. 应用服务
      2. 合并服务
      3. CDC 服务(变更数据捕获)
      4. 代理服务(Kafka、RabbitMQ、...)

      基于“发件箱模式”的主流:

      1. 应用服务将事件消息保存到发件箱消息表
      2. CDC Service 监视发件箱表并将事件消息从发件箱表发布到 Broker Servie
      3. 合并Service订阅Broker Server并接收事件消息(消息有序)
      4. Merge Servcie 执行合并操作

      您可以为此流程使用eventuate lib。

      此外,您还可以将 DDD 应用到您的架构中。使用 Axon 框架进行 CQRS 模式、公共领域事件并对其进行处理。

      参考:

      1. 发件箱模式:https://microservices.io/patterns/data/transactional-outbox.html

      【讨论】:

      • Spring 也支持异步事件,它所需要的(假设在 Spring Boot 配置中启用了异步支持)就是用 @Async 注释监听器
      • 是的,带有发件箱模式,使用 ACID 事务同步
      【解决方案4】:

      听起来您确实可以使用StreamETL 工具来完成这项工作。当您开发应用程序时,您有一些优先级/排队/批处理要求,很容易看出如何使用Cron + SQL Database 构建解决方案,可能需要一个队列来将工作与生产工作分离.

      这很可能是最容易构建的东西,因为您对这种方法有很多粒度和控制权。如果您认为您实际上可以通过这种方式快速且低风险地满足您的要求,那么您可以这样做。

      有些软件组件更适合这些任务,但它们确实有一些学习曲线,并且取决于您可能使用的 PAAS 或云。您将获得开箱即用的监控、可扩展性和可用性弹性。开源或云服务将减轻您的管理负担。

      使用什么还取决于您的优先级和要求。如果您想采用擅长存储工作的 ETL 方法,您可能需要使用 Glue t 之类的东西。如果您想要优先级功能,您可能想要使用多个队列,这真的取决于。您还需要使用仪表板进行监控,以查看无论采用何种方法,您的合并都应等待多长时间。

      【讨论】:

        【解决方案5】:

        经过一些考虑,我对这个问题的想法。

        根据 OP 的规范,我将可能的解决方案限制在 Azure 托管服务中可用的内容。

        Azure Blob 存储函数触发器

        因为这个问题是关于存储文件的,所以让我们开始探索具有在文件创建时触发的触发器功能的 Blob 存储。根据文档,Azure 函数最多可以运行 230 秒,并且默认重试次数为 5。

        但是,此解决方案将要求来自单个客户的文件以不会导致并发问题的方式到达,因此让我们暂时离开此解决方案。

        Azure 队列存储

        不保证先进先出(FIFO)有序交付,因此不符合要求。

        存储队列和服务总线队列 - 比较和对比:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted

        Azure 服务总线

        Azure Service Bus 是一个 FIFO 队列,似乎满足要求。

        https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted#compare-storage-queues-and-service-bus-queues

        从上面的文档中,我们看到大文件不适合作为消息负载。为了解决这个问题,可以将文件存储在 Azure Blob 存储中,并且消息将包含在哪里找到文件的信息。


        选择了 Azure 服务总线Azure Blob 存储,让我们讨论一下实现注意事项。

        队列生产者

        在 AWS 上,生产者方的解决方案是这样的:

        1. 专用端点为客户应用提供预签名 URL
        2. 客户应用将文件上传到 S3
        3. S3 对象创建触发的 Lambda 将消息插入队列

        不幸的是,Azure 还没有等效的预签名 URL(它们具有不相等的共享访问签名),因此文件上传必须通过端点完成,该端点又将文件存储到 Azure Blob 存储.当需要文件上传端点时,让文件上传端点也负责将消息插入队列似乎是合适的。

        队列消费者

        因为文件合并需要大量时间(约 20 秒),所以应该可以横向扩展消费者端。对于多个消费者,我们必须确保一个客户只被一个消费者实例处理。 这可以通过使用消息会话来解决:https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions

        为了实现容错,消费者应该在文件合并期间使用 peek-lock(而不是接收和删除),并在文件合并完成时将消息标记为已完成。当消息被标记为完成时,消费者可能负责 删除 Blob 存储中的多余文件。

        现有解决方案和未来解决方案可能存在的问题

        如果客户A开始上传一个巨大的文件#1,然后立即开始上传一个小的文件#2,文件上传file #2 可能在 file #1 之前完成并导致出现乱序的情况。

        我认为这是通过使用某种锁定机制或文件名约定在现有解决方案中解决的问题。

        【讨论】:

        • 非常感谢,非常有帮助。我最初对 Azure 队列存储非常感兴趣,直到我发现订阅者在等待消息时无法阻止。我简直不敢相信,在 2021 年,发布-订阅技术需要订阅者反复轮询消息,但似乎确实如此。
        猜你喜欢
        • 1970-01-01
        • 2014-06-21
        • 1970-01-01
        • 1970-01-01
        • 2013-07-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-05-01
        相关资源
        最近更新 更多