【问题标题】:How to speed up camel routes如何加快骆驼路线
【发布时间】:2021-08-19 18:43:52
【问题描述】:

我正在开发一项服务(spring-boot),它接受一个 id 列表,一个接一个地从数据库中获取对象,将这些对象聚合成批次,然后将它们保存在其他地方。目前,聚合后的批量大小约为 50 个对象,大约每秒有 10 个对象被发送到聚合路由。现在我正在尝试了解如何加快路线速度,或者至少了解瓶颈在哪里。

这是同时向 direct:findObject

发送请求的 bean
@Produce(uri = "direct:findObject")
private ProducerTemplate producerTemplate;

public void send() {
    List<Long> listOfIDs = someService.getIDList(); //1000000 ids in the list
    listOfIDs.parallelStream().forEach(producerTemplate::sendBody);
}

我的骆驼路线:

    from(direct:findObject)
            .bean(objectDaoBean)
            .marshal().json(JsonLibrary.Gson, MyObject.class)
            .to("direct:aggregator");

    from("direct:aggregator")
            .unmarshal().json(JsonLibrary.Gson, MyObject.class)
            .aggregate(constant(true), new GroupedBodyAggregationStrategy())
            .parallelProcessing()
            .completionInterval(TimeUnit.SECONDS.toMillis(5))
            .completionSize(1000)
            .marshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
            .to("direct:bulkSave");

    from("direct:bulkSave")
            .unmarshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
            .bean(bulkSaveBean);

非常感谢任何有关如何加快整个过程的帮助或见解。

更新: 通过@Fábio Carmo Santos 提出的更改,我设法提高了总体性能(至少 x10)

application.properties:

camel.threadpool.pool-size=10
camel.threadpool.max-pool-size=400
camel.threadpool.max-queue-size=-1

更新的生产者:

@Produce("direct:splitter")
private ProducerTemplate producerTemplate;

public void sendList() {
    List<Long> listOfIDs = getIDList(); //1000000 ids in the list
    producerTemplate.sendBody(listOfIDs);
}

修改骆驼路线:

   from("direct:splitter")
            .split(body())
            .parallelProcessing()
            .bean(new DaoBean()) //fetch an object by id from db here
            .to("direct:aggregator");

    from("direct:aggregator")
            .aggregate(body(), new GroupedBodyAggregationStrategy())
            .parallelProcessing()
            .completionInterval(TimeUnit.SECONDS.toMillis(5))
            .completionSize(1000)
            .to("direct:bulkSave");

    from("direct:bulkSave")
            .bean(new SaveBean());

在我进入日志之前:

Saving... Batch size 30
Saving... Batch size 45
Saving... Batch size 41
Saving... Batch size 53
Saving... Batch size 47
Saving... Batch size 50

并且总体执行时间至少是几个小时(从不等到它完成)

现在更好更快了

Saving... Batch size 499
Saving... Batch size 1000
Saving... Batch size 401
Saving... Batch size 1000
Saving... Batch size 257
Saving... Batch size 1000

【问题讨论】:

  • 通过阅读您的代码,我想知道为什么在向/从您自己的路由转发消息时要编组/解组。有点贵;看来你不需要那个。
  • 由于我对骆驼的了解有限,如果没有编组/解组对象,我无法使其工作。

标签: java spring multithreading spring-boot apache-camel


【解决方案1】:

getIDList() 时的 ID 是否不同?如果没有,您可以区分它们以消除重复的 ID。只有你能决定它是否有意义。

你为什么不把路由列表发送给 Camel 给你拆分呢? Split 中的流式传输选项非常有用。 PS.:仅在最近的一个项目中使用它,我设法将处理时间从 1m30s 减少到 15s!

也请查看Threading Model。也许,您会想编辑线程池配置文件。默认最大为 20。

实际上有很多方法可以处理它。您还可以通过对集合进行分区来对搜索进行分页,然后通过一次从数据库获取多条记录(假设每次轮询 200 条)来丰富您的 Exchange,而不是逐个获取,如果需要使用 AggregationStrategy 或仅拆分搜索结果。

【讨论】:

  • 列表中的所有 ID 始终是唯一的。因此,您建议不要将 ID 逐个发送到 direct:findObject 路由,而是将整个列表发送给骆驼(拆分器?)来处理它?我会试试的
  • 没错!一开始,你可以试试。我想您可以通过对整个 ID 列表进行分区并从 DB 中一次检索多条记录来加快速度。
猜你喜欢
  • 2018-09-05
  • 2011-05-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-05
  • 1970-01-01
  • 1970-01-01
  • 2023-03-03
相关资源
最近更新 更多