【问题标题】:Asynchronous message processing with Camel使用 Camel 进行异步消息处理
【发布时间】:2015-02-09 15:18:39
【问题描述】:

我需要帮助来找出以下设计问题的最佳解决方案。

我有一个供我们的用户使用的前端服务器。他们提交应该异步运行的任务,并在完成时发送电子邮件。该任务执行调用外部 API 并在那里更新某些内容。有与 API 关联的帐户(不要被用户帐户混淆)。用户将为特定帐户运行任务,一旦该帐户在进行 api 更新时被占用,其他想要使用相同帐户更新 API 的任务必须等待。请注意,用户不必等待,他们可能会为同一个 (api) 帐户提交许多任务。系统将负责将任务放入队列中。 队列将被一堆服务器监听,一旦帐户被释放,下一个任务将被其他服务器接收。因此,我们提交了多个任务,更新了多个帐户的 api。 我正在将 Apache Camel 作为我的解决方案,前端将在总线上提交任务,其中一台服务器将接收它。但是,如果同一帐户有多个任务,则只能执行一个,如上所述,我想将同一帐户的其他任务保留在队列中的某个位置(我不知道该怎么做)和一旦帐户被释放,下一条消息应该由集群中的一个服务器选择。 任何关于更好解决方案的建议,甚至不确定 Camel 是否是此类问题的最佳解决方案。我是这种异步设计问题的新手。

【问题讨论】:

    标签: java design-patterns apache-camel esb mq


    【解决方案1】:

    骆驼非常适合这种设计。我们在非常相似的场景中使用了它,它提供了您需要的一切开箱即用的东西(转换、异步调用等)。

    我的建议:

    1. 使用 JMS 服务器,每个 API 帐户有 1 个队列
    2. 创建前端服务以获取用户调用(例如 SOAP/HTTP)并将任务放入正确的队列中(使用 http://camel.apache.org/recipient-list.html 模式)
    3. 为队列创建监听器(1 个监听器,每个队列有 1 个线程 ?maxConcurrentConsumers=1)。例如,您可以在 setApplicationContext 事件上动态创建它们

    【讨论】:

    • 感谢 Surgey,我们有大约 3000 个帐户。我不确定配置 3000 个队列和 3000 个侦听器的效率如何。想法?
    【解决方案2】:

    我认为您可以使用 2 个队列和两种企业集成模式来完成此设计:aggregatorconcurrent consumers

    第一个队列是提交作业的地方。我们称之为submit。这里的工作是通过accountId聚合作业并将它们批量发送到另一个队列1

    我们只需要在 one 节点上运行路由(为此我们使用master: 组件)

    from("master:myyapp:mq:queue:submit)
        .aggregate(header("accountId"), new SimpleAggregator()).completionSize(1)
        .inOut("mq:queue:process");
    

    现在,您只需将作业发送到process 队列,集群中的任何服务器都可以处理该队列。这里的关键是我们使用inOut模式,这是一个请求-回复模式(你的jms必须支持这个)。

    我认为这应该解决在任何时候对于给定帐户 ID 只应运行一个作业的要求(但您最好先在某些测试中模拟它)。

    【讨论】:

      【解决方案3】:

      @vikingsteve 有一个很好的答案,但我认为 idempotent consumer 值得一看,它可能更适合用例。

      主要原因是aggregator 用于将许多交换一起批量处理,而这将确保它只处理给定上下文的单个项目(这使用 JpaRepo 来存储状态)

      RouteBuilder 看起来像这样:

      // api endpoint
      from("jetty:http://localhost:8080/api/submit")
          .inOnly("jms:queue:submit")
          .process(new Processor() {
              // set the response message
          });
      
      // this implements the async processing, but will only process 
      // a single accountId at a time
      from("jms:queue:submit")
          .idempotentConsumer(
              header("accountId"),
              jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME))
          .inOut("jms:queue:process");
      

      并使用这个骆驼配置来拥有 10 个 jms 消费者(这将使您能够并行运行 10 个操作)

      <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
          <property name="autoStartup" value="true" />
          <property name="concurrentConsumers" value="10" />
          <property name="connectionFactory" ref="pooledConnectionFactory" />
      </bean>
      

      【讨论】:

      • 您好 stringy,感谢您向我们展示这种模式。问题 - 这是否允许多个节点同时处理任务(针对不同的帐户 ID)?
      • 是的。如果您说 JMS 组件上有 10 个使用者,那么他们每个人都将并行运行。如果他们点击已经在幂等存储中注册的键,它将停止交换并继续前进
      • 好的,但它应该在端点中有master: 吗?这将导致上述逻辑仅在一个节点上运行。
      • 我只是从其他人的例子中复制的,我会更新答案
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-24
      • 2011-10-20
      • 2017-02-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多