【问题标题】:Cassandra execute_async request lose dataCassandra execute_async 请求丢失数据
【发布时间】:2019-02-07 10:27:11
【问题描述】:

我需要使用 Cassandra 的 Python DataStax 驱动程序插入大量数据。结果我不能使用 execute() 请求。 execute_async( ) 要快得多。

但是我在调​​用 execute_async() 时遇到了丢失数据的问题。如果我使用 execute(),一切正常。但是,如果我使用 execute_async()(对于 SAME 插入查询),我的请求中只有大约 5-7% 正确执行(并且没有发生任何错误)。如果我在每个 1000 个插入请求之后添加 time.sleep(0.01)(通过使用 execute_async()),就可以了。

没有任何数据丢失(案例 1):

for query in queries:
    session.execute( query )

没有任何数据丢失(案例2):

counter = 0
for query in queries:
    session.execute_async( query )
    counter += 1
    if counter % 1000 == 0:
        time.sleep( 0.01 )

数据丢失:

for query in queries:
    session.execute_async( query )

有什么理由吗?

集群有 2 个节点

[cqlsh 5.0.1 |卡桑德拉 3.11.2 | CQL 规范 3.4.4 |原生协议 v4]

DataStax Python 驱动程序版本 3.14.0

Python 3.6

【问题讨论】:

  • 集群不接受数据时,节点状态如何?您通过哪个 RF 写入数据?
  • 我的RF等于2。每个节点的状态都是UN。因此,集群每次都处于活动状态。

标签: python-3.x cassandra datastax-python-driver


【解决方案1】:

由于execute_async 是一个非阻塞查询,您的代码在继续之前不会等待请求完成。每次执行后添加 10 毫秒 sleep 时您可能没有观察到数据丢失的原因是,在您读回数据之前,这为处理请求提供了足够的时间。

您需要在代码中等待请求完成后再读回数据,即:

futures = []
for query in queries:
    futures.push(session.execute(query))

for f in futures:
    f.result() # blocks until query is complete

您可能希望使用execute_concurrent 进行评估以提交许多查询并让驱动程序为您管理并发级别。

【讨论】:

    猜你喜欢
    • 2014-03-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-23
    • 2016-03-21
    相关资源
    最近更新 更多