【发布时间】:2020-08-13 17:15:03
【问题描述】:
我在下面有这段代码,它从 pubsub 源主题获取消息 -> 根据模板对其进行转换 -> 然后将转换后的消息发布到目标主题。
但为了提高性能,我需要并行执行此任务。即我需要轮询 500 条消息,然后将其并行转换,然后将它们发布到目标主题。
从骆驼 gcp 组件文档中,我相信 maxMessagesPerPoll 和 concurrentConsumers 参数可以完成这项工作。由于缺乏文档,我不确定它在内部是如何工作的。
我的意思是 a)如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题 b)消息的排序如何 c)我应该查看并行处理EIP 作为替代方案
等等
这个概念我不清楚
去
// my route
private void addRouteToContext(final PubSub pubSub) throws Exception {
this.camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("google-pubsub:{{gcp_project_id}}:{{pubsub.dead.letter.topic}}")
.useOriginalMessage().onPrepareFailure(new FailureProcessor()));
/*
* from topic
*/
from("google-pubsub:{{gcp_project_id}}:" + pubSub.getFromSubscription() + "?"
+ "maxMessagesPerPoll={{consumer.maxMessagesPerPoll}}&"
+ "concurrentConsumers={{consumer.concurrentConsumers}}").
/*
* transform using the velocity
*/
to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
/*
* attach header to the transform message
*/
setHeader("Header ", simple("${date:now:yyyyMMdd}")).routeId(pubSub.getRouteId()).
/*
* log the transformed event
*/
log("${body}").
/*
* publish the transformed event to the target topic
*/
to("google-pubsub:{{gcp_project_id}}:" + pubSub.getToTopic());
}
});
}
【问题讨论】:
标签: apache-camel google-cloud-pubsub