【问题标题】:how to perform parallel processing of gcp pubsub messages in apache camel如何在 apache camel 中执行 gcp pubsub 消息的并行处理
【发布时间】:2020-08-13 17:15:03
【问题描述】:

我在下面有这段代码,它从 pubsub 源主题获取消息 -> 根据模板对其进行转换 -> 然后将转换后的消息发布到目标主题。

但为了提高性能,我需要并行执行此任务。即我需要轮询 500 条消息,然后将其并行转换,然后将它们发布到目标主题。

从骆驼 gcp 组件文档中,我相信 maxMessagesPerPoll 和 concurrentConsumers 参数可以完成这项工作。由于缺乏文档,我不确定它在内部是如何工作的。

我的意思是 a)如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题 b)消息的排序如何 c)我应该查看并行处理EIP 作为替代方案

等等

这个概念我不清楚

// my route
private void addRouteToContext(final PubSub pubSub) throws Exception {

    this.camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            errorHandler(deadLetterChannel("google-pubsub:{{gcp_project_id}}:{{pubsub.dead.letter.topic}}")
                    .useOriginalMessage().onPrepareFailure(new FailureProcessor()));




            /*
             * from topic
             */
            from("google-pubsub:{{gcp_project_id}}:" + pubSub.getFromSubscription() + "?"
                    + "maxMessagesPerPoll={{consumer.maxMessagesPerPoll}}&"
                    + "concurrentConsumers={{consumer.concurrentConsumers}}").
            /*
             * transform using the velocity
             */
            to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
            /*
             * attach header to the transform message
             */
            setHeader("Header ", simple("${date:now:yyyyMMdd}")).routeId(pubSub.getRouteId()).
            /*
             * log the transformed event
             */
            log("${body}").
            /*
             * publish the transformed event to the target topic
             */
            to("google-pubsub:{{gcp_project_id}}:" + pubSub.getToTopic());
        }
    });
}

【问题讨论】:

    标签: apache-camel google-cloud-pubsub


    【解决方案1】:

    当您提到concurrentConsumers 选项(比如说concurrentConsumers=10)时,您是在要求 Camel 创建一个由 10 个线程组成的线程池,这 10 个线程中的每一个都会从 pub-sub 队列中获取不同的消息并处理它们。

    这里要注意的是,当您指定 concurrentConsumers 选项时,线程池使用固定大小,这意味着始终有固定数量的活动线程在等待处理传入的消息。所以 10 个线程(因为我指定 concurrentConsumers=10)将等待处理我的消息,即使没有 10 条消息同时进入。

    显然,这并不能保证传入消息将以相同的顺序进行处理。如果您希望消息以相同的顺序排列,您可以查看Resequencer EIP 来为您的消息排序。

    至于您的第三个问题,我认为 google-pubsub 组件不允许并行处理选项。您可以使用Threads EIP 自己制作。这肯定会更好地控制您的并发性。

    使用线程,您的代码将如下所示:

    from("google-pubsub:project-id:destinationName?maxMessagesPerPoll=20")
    // the 2 parameters are 'pool size' and 'max pool size'
    .threads(5, 20)
    .to("direct:out");
    

    【讨论】:

    • 谢谢你 Sneharghya,你已经非常清楚地解释了我正在寻找的概念。非常感谢
    【解决方案2】:

    a) 如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题

    不,Camel 在这种情况下不会创建 500 个并行线程。正如您所怀疑的,并发消费者线程的数量是用concurrentConsumers 设置的。因此,如果您定义 5 个concurrentConsumersmaxMessagesPerPoll 为 500,则每个消费者将获取多达 500 条消息并在单个线程中一个接一个地处理它们。也就是说,您有 5 条消息并行处理。

    消息的顺序如何

    一旦您并行处理消息,消息的顺序就会混乱。但是,当您遇到处理错误时,1 个消费者已经发生了这种情况,它们被绕道到您的 deadLetterChannel 并在稍后重新处理。

    我是否应该考虑将并行处理 EIP 作为替代方案

    仅当concurrentConsumers 选项不足时。

    【讨论】:

    • 谢谢 burki,你已经清楚地解释了我难以理解的问题。我现在很好。非常感谢 -
    猜你喜欢
    • 1970-01-01
    • 2016-01-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-27
    • 2020-10-09
    • 1970-01-01
    相关资源
    最近更新 更多