【问题标题】:Cassandra assynchronous execution in multiple processes blocking synchronous requestsCassandra在多个进程中异步执行阻塞同步请求
【发布时间】:2015-11-14 19:07:20
【问题描述】:

我有一个应用程序,它读取一系列 XML 文件,其中包含道路上车辆通行的日志。然后应用程序处理每条记录,转换一些信息以匹配数据库列并将其插入到 cassandra 数据库中(在远程服务器中运行单个节点[它在内部网络中,因此连接不是真正的问题]) .在数据库中插入数据后,每个文件的进程会继续读取这些数据并为汇总表生成信息,从而为在应用程序的不相关部分进行深入分析做好准备。

我正在使用多处理并行处理许多 XML 文件,而我遇到的问题是与 cassandra 服务器通信。大致流程如下:

  1. 从 XML 文件中读取记录
  2. 处理记录的数据
  3. 将处理后的数据插入数据库(使用.execute_async(query)
  4. 重复 1 到 3 直到 XML 文件结束
  5. 等待我所做的所有插入查询的响应
  6. 从数据库中读取数据
  7. 处理读取的数据
  8. 将处理后的数据插入汇总表中

现在,它在多个并行进程中顺利运行,直到一个进程进入第 6 步时,它的请求(使用.execute(query) 发出,意味着我将等待响应)总是面临超时。我收到的错误是:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}

我已将服务器中的超时更改为荒谬的时间(例如 500000000 毫秒),并且我还尝试在客户端中设置超时限制,使用 .execute(query, timeout=3000),但仍然没有成功。

现在,当更多进程遇到相同的问题并且多个进程中步骤 1-3 的密集写入停止时,到达步骤 6 的最后一个进程已成功执行该过程,这让我认为问题在于 cassandra优先处理我每秒请求的数万个插入请求,要么忽略我的读取请求,要么将其放回队列中。

在我看来,解决这个问题的一种方法是,如果我可以以任何方式要求 cassandra 优先处理我的读取请求,以便我可以继续处理,即使这意味着减慢其他进程。

现在,作为旁注,您可能会认为我的流程建模不是最佳的,我很想听听对此的意见,但是对于这个应用程序的实际情况,在我们看来,这是最好的方法.所以我们实际上已经对优化流程进行了广泛的思考,但是(如果 cassandra 服务器可以处理它)这对于我们的现实来说是最佳的。

那么,TL;DR:在执行数以万计的异步查询时,有没有一种方法可以优先考虑某个查询?如果没有,有没有办法以请求不超时的方式每秒执行数万个插入查询和读取查询?另外,你会建议我做什么来解决这个问题?并行运行更少的进程显然是一种解决方案,但我试图避免。所以,很想听听大家的想法。

在插入时存储数据,因此我不需要再次阅读它以进行摘要是不可能的,因为 XML 文件很大并且内存是一个问题。

【问题讨论】:

    标签: python database cassandra multiprocessing cql


    【解决方案1】:

    我不知道有什么方法可以优先读取查询。我相信 Cassandra 内部有单独的线程池用于读取和写入操作,因此它们是并行运行的。如果没有看到您正在执行的架构和查询,很难说您是否正在执行非常昂贵的读取操作,或者系统是否被写入过多以至于无法跟上读取。

    您可能想尝试在应用程序运行时监控 Cassandra 中发生的情况。您可以使用多种工具来监控正在发生的事情。例如,如果您 ssh 到您的 Cassandra 节点并运行:

    watch -n 1 nodetool tpstats
    

    这将显示线程池统计信息(每秒更新一次)。您将能够查看队列是否已满或操作是否被阻止。如果任何“丢弃”计数器增加,这表明您没有足够的容量来完成您正在尝试做的事情。如果是这种情况,请通过添加更多节点来增加容量,或者更改架构和方法,以减少节点要做的工作。

    其他有用的监控(在linux上使用watch -n 1持续监控):

    nodetool compactionstats
    nodetool netstats
    nodetool cfstats <keyspace.table name>
    nodetool cfhistograms <keyspace> <table name>
    

    也可以使用 linux 命令(如 top 和 iostat)监控节点以检查 CPU 利用率和磁盘利用率。

    你所说的我的印象是,你的单个节点没有足够的容量来完成你给它的所有工作,所以你要么需要在单位时间内处理更少的数据,要么添加更多的 Cassandra 节点分散工作量。

    由于分区的行数过多,我目前面临自己的超时错误,因此我可能必须向分区键添加基数以使每个分区的内容更小。

    【讨论】:

    • 感谢您的建议!我将尝试使用您建议的内容来监视我的节点的活动,并决定每次是否需要更多的结构或更少的查询。很高兴知道这可能是一个结构问题,因为使用 cassandra 添加更多节点很容易。感谢 Jim 抽出宝贵时间,一旦我们解决了问题,我会再次发表评论。
    • 顺便问一下,关于为什么提高服务器和客户端请求超时不起作用的任何想法?
    • 我对Cassandra内部的了解还不够多,但超时错误似乎有很多不同的含义。很可能我猜这将是一种饥饿情况,即某些操作在队列中的等待时间过长并且被毫不客气地丢弃。您愿意等待漫长的操作完成当然令人沮丧,但 Cassandra 决定放弃。
    • 感谢您的反馈,我们有更多的论据来确定这是一个基础设施问题,而不是我们做错了什么。我们已经对过程进行了划分,这样我们就不会用太多的查询来淹没 cassandra。感谢您的反馈!
    猜你喜欢
    • 2012-10-14
    • 1970-01-01
    • 1970-01-01
    • 2017-11-01
    • 2018-04-06
    • 2014-07-29
    • 2019-01-05
    • 1970-01-01
    • 2017-09-17
    相关资源
    最近更新 更多