【发布时间】:2015-11-14 19:07:20
【问题描述】:
我有一个应用程序,它读取一系列 XML 文件,其中包含道路上车辆通行的日志。然后应用程序处理每条记录,转换一些信息以匹配数据库列并将其插入到 cassandra 数据库中(在远程服务器中运行单个节点[它在内部网络中,因此连接不是真正的问题]) .在数据库中插入数据后,每个文件的进程会继续读取这些数据并为汇总表生成信息,从而为在应用程序的不相关部分进行深入分析做好准备。
我正在使用多处理并行处理许多 XML 文件,而我遇到的问题是与 cassandra 服务器通信。大致流程如下:
- 从 XML 文件中读取记录
- 处理记录的数据
- 将处理后的数据插入数据库(使用
.execute_async(query)) - 重复 1 到 3 直到 XML 文件结束
- 等待我所做的所有插入查询的响应
- 从数据库中读取数据
- 处理读取的数据
- 将处理后的数据插入汇总表中
现在,它在多个并行进程中顺利运行,直到一个进程进入第 6 步时,它的请求(使用.execute(query) 发出,意味着我将等待响应)总是面临超时。我收到的错误是:
Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
veiculos = sessao_cassandra.execute(sql_pronto)
File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
result = future.result(timeout)
File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}
我已将服务器中的超时更改为荒谬的时间(例如 500000000 毫秒),并且我还尝试在客户端中设置超时限制,使用 .execute(query, timeout=3000),但仍然没有成功。
现在,当更多进程遇到相同的问题并且多个进程中步骤 1-3 的密集写入停止时,到达步骤 6 的最后一个进程已成功执行该过程,这让我认为问题在于 cassandra优先处理我每秒请求的数万个插入请求,要么忽略我的读取请求,要么将其放回队列中。
在我看来,解决这个问题的一种方法是,如果我可以以任何方式要求 cassandra 优先处理我的读取请求,以便我可以继续处理,即使这意味着减慢其他进程。
现在,作为旁注,您可能会认为我的流程建模不是最佳的,我很想听听对此的意见,但是对于这个应用程序的实际情况,在我们看来,这是最好的方法.所以我们实际上已经对优化流程进行了广泛的思考,但是(如果 cassandra 服务器可以处理它)这对于我们的现实来说是最佳的。
那么,TL;DR:在执行数以万计的异步查询时,有没有一种方法可以优先考虑某个查询?如果没有,有没有办法以请求不超时的方式每秒执行数万个插入查询和读取查询?另外,你会建议我做什么来解决这个问题?并行运行更少的进程显然是一种解决方案,但我试图避免。所以,很想听听大家的想法。
在插入时存储数据,因此我不需要再次阅读它以进行摘要是不可能的,因为 XML 文件很大并且内存是一个问题。
【问题讨论】:
标签: python database cassandra multiprocessing cql