【发布时间】:2019-03-28 22:39:54
【问题描述】:
我每天尝试使用 Apache Camel 从 Informix 表中处理大约 700 万行,但我不知道如何完成。
我第一次尝试处理非常少的数据集(大约 50k 行)是使用 .split(body()).parallelProcessing(),如下所示:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");
from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();
当我在.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() 上尝试使用 500k 行时,这当然会导致“OutOfMemory”错误,因为它首先尝试在解析查询之前缓存查询中的所有数据。我尝试将 fetchSize 设置为 100 之类的值,但我得到了同样的错误,使用 maxRows 只会让我得到我指定的行数,而忽略其余的。
我的下一次尝试是使用 Camel 的组件之一,例如 sql-component 和 jdbc,并尝试使用 Splitter 在单独的线程中处理每一行,但我遇到了同样的问题。
sql:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same
jdbc:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()
我的最后一次尝试是将maxMessagesPerPoll 用于sql,将outputType=StreamList 用于jdbc 组件,但不幸的是,前者一次只处理一行(而且它必须是消费者才能使用),并且后者给了我一个java.sql.SQLException: Cursor not open 异常。
sql:
from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component
jdbc:
.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception
最终目标是能够在不消耗太多内存的情况下处理数百万行,从而防止“OutOfMemory”错误。如果可能的话,我的想法是执行以下操作:
- 在石英 cron-trigger 上创建我的查询
- 获得并分组N个结果
- 发送一组结果以进行处理(在另一个线程中),同时获取另一组结果
- 重复直到处理完所有数据
我知道这个问题类似于this one,但答案并没有真正帮助我的情况。我还注意到,在 sql 组件的文档中,它有一个用于生产者的 outputType=StreamList 选项,但它是在 2.18 及更高版本上实现的,而我有 2.14.1 版本。
任何帮助和提示都会非常有帮助!
谢谢。
其他一些信息: 阿帕奇骆驼版本:2.14.1 数据库:Informix
【问题讨论】:
-
从文档 (camel.apache.org/sql-component.html) 看来,StreamList 工具可能是实现 Camel-native 的唯一方法。或者,您可以创建一个手动进行 SQL 查询的类,获取一个游标 (docs.oracle.com/javase/7/docs/api/java/sql/…),然后将批量(例如,一次 10k 行)交给另一个路由进行处理。您可以使用 seda 队列 (camel.apache.org/seda.html) 并设置最大大小并让您的自定义类休眠直到可用,这样您就不会阻塞下游。
-
@NotaJD 感谢您对 seda 队列的建议,我会调查的。至于光标,我尝试在 QueryTable 类中使用
JdbcTemplate.fetchSize=100,但由于某种原因,该选项被忽略了。我还尝试通过查询限制结果,虽然它在前 2 次迭代中有效,但随后的迭代减慢了进程并出错。 -
是否为您的 JdbcTemplate 关闭了自动提交功能?我记得有些驱动程序在自动提交和流式传输结果方面存在问题。另外,您如何验证 fetchSize 被忽略?当您打开游标时,此参数会尝试修复返回的行数(例如,如果您的查询返回 1000 万行,它将一次从在线数据库中获取 100 行,无论如何这是理论上的)。
-
@NotaJD 当使用 jdbc 端点选项的
resetAutoCommit=false时,它不会给我一个错误。至于 fetchSize 的验证,我设置了一个非常小的大小并尝试使用它,但它从未通过执行并抛出错误。我尝试使用 RowCallbackHandler 并且这似乎有效,但不幸的是它给了我一个GC overhead limit exceeded异常,在我的列表中大约有 280k 条记录。 -
您能否使用“原始”JdbcTemplate 设置独立/隔离测试,执行查询并成功读取记录集中的每一行(即只需调用 while (rs.hasNext()) { rs.下一个(); })。如果它是流式传输和游标(等),它不应该耗尽内存。
标签: java apache-camel informix