【发布时间】:2021-08-19 18:43:52
【问题描述】:
我正在开发一项服务(spring-boot),它接受一个 id 列表,一个接一个地从数据库中获取对象,将这些对象聚合成批次,然后将它们保存在其他地方。目前,聚合后的批量大小约为 50 个对象,大约每秒有 10 个对象被发送到聚合路由。现在我正在尝试了解如何加快路线速度,或者至少了解瓶颈在哪里。
这是同时向 direct:findObject
发送请求的 bean@Produce(uri = "direct:findObject")
private ProducerTemplate producerTemplate;
public void send() {
List<Long> listOfIDs = someService.getIDList(); //1000000 ids in the list
listOfIDs.parallelStream().forEach(producerTemplate::sendBody);
}
我的骆驼路线:
from(direct:findObject)
.bean(objectDaoBean)
.marshal().json(JsonLibrary.Gson, MyObject.class)
.to("direct:aggregator");
from("direct:aggregator")
.unmarshal().json(JsonLibrary.Gson, MyObject.class)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.parallelProcessing()
.completionInterval(TimeUnit.SECONDS.toMillis(5))
.completionSize(1000)
.marshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
.to("direct:bulkSave");
from("direct:bulkSave")
.unmarshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
.bean(bulkSaveBean);
非常感谢任何有关如何加快整个过程的帮助或见解。
更新: 通过@Fábio Carmo Santos 提出的更改,我设法提高了总体性能(至少 x10)
application.properties:
camel.threadpool.pool-size=10
camel.threadpool.max-pool-size=400
camel.threadpool.max-queue-size=-1
更新的生产者:
@Produce("direct:splitter")
private ProducerTemplate producerTemplate;
public void sendList() {
List<Long> listOfIDs = getIDList(); //1000000 ids in the list
producerTemplate.sendBody(listOfIDs);
}
修改骆驼路线:
from("direct:splitter")
.split(body())
.parallelProcessing()
.bean(new DaoBean()) //fetch an object by id from db here
.to("direct:aggregator");
from("direct:aggregator")
.aggregate(body(), new GroupedBodyAggregationStrategy())
.parallelProcessing()
.completionInterval(TimeUnit.SECONDS.toMillis(5))
.completionSize(1000)
.to("direct:bulkSave");
from("direct:bulkSave")
.bean(new SaveBean());
在我进入日志之前:
Saving... Batch size 30
Saving... Batch size 45
Saving... Batch size 41
Saving... Batch size 53
Saving... Batch size 47
Saving... Batch size 50
并且总体执行时间至少是几个小时(从不等到它完成)
现在更好更快了
Saving... Batch size 499
Saving... Batch size 1000
Saving... Batch size 401
Saving... Batch size 1000
Saving... Batch size 257
Saving... Batch size 1000
【问题讨论】:
-
通过阅读您的代码,我想知道为什么在向/从您自己的路由转发消息时要编组/解组。有点贵;看来你不需要那个。
-
由于我对骆驼的了解有限,如果没有编组/解组对象,我无法使其工作。
标签: java spring multithreading spring-boot apache-camel