【发布时间】:2026-02-04 20:35:01
【问题描述】:
我一直在使用以下脚本上传数据来对我的模块进行负载测试:
import json
import ast
import pandas as pd
import sys
import cloudant_connection as cloud
df = pd.read_csv("./deviceId_data/device_starts_"+ sys.argv[1] + ".csv")
print(" checkpoint 1 cleared ")
def push_data_to_cloudant(ID,Key,Value,Database):
Value = ast.literal_eval(Value)
temp_doc = {}
temp_doc["_id"] = ID
temp_doc["value"] = Value["value"]
temp_doc["devId"] = Value["devId"]
temp_doc["eDateTime"] = Key[0]
temp_doc["eDate"] = Value["eDate"]
temp_doc["cDateTime"] = Key[0]
temp_doc["cDate"] = Value["cDate"]
new_doc = Database.create_document(temp_doc)
if new_doc.exists():
#print("doc created")
return "Success"
else:
print("Failed in pushing document")
return "Failure"
with open("./connection_config_source.json") as f:
connect_conf = json.load(f)
print(" checkpoint 2 cleared ")
API_KEY = connect_conf['cloudant_api_key']
ACC_NAME = connect_conf['cloudant_account_name']
print(" checkpoint 3 cleared ")
try:
client = cloud.connecting_to_cloudant_via_api(ACC_NAME,API_KEY)
database_name = 'DB_NAME'
Database = client[database_name]
print(" checkpoint 4 cleared ")
if Database.exists():
print("Connected")
status = [push_data_to_cloudant(ID,Key,Value,Database) for (ID,Key,Value) in zip(df['id'],df['key'],df['value'])]
print(" last checkpoint cleared ")
except Exception as e:
print("Failed:" + str(e))
我知道有比使用列表推导更快的方法。但我不知道如何在这种情况下使用它们。
我知道 df.apply() 比这更快,但我想知道我是否可以在这个用例中使用 Pandas Vectorization 或 Numpy Vectorization。
【问题讨论】:
-
Numpy 向量化和列表解析对于 cpu-bound 进程来说非常快。但是,您正在写入数据库,并且您是否使用结果列表是有争议的。这是一个 I/O 绑定过程,列表创建本身不是瓶颈。我会说这是对副作用使用理解语法
-
那么如何将 23GB 的文件传输到 cloudant?任何想法我将不胜感激。顺便说一句,我已经想到了多线程和多处理。这段代码需要 2 天时间才能将 1.5GB 传输到 cloudant,我认为它很慢。
-
我同意,这很慢。也许this answer 是您要找的?
标签: python pandas python-cloudant