【问题标题】:Bulk upload data to cloudant or any efficient way of uploading data to cloudant using python?将数据批量上传到 cloudant 或使用 python 将数据上传到 cloudant 的任何有效方式?
【发布时间】:2026-02-04 20:35:01
【问题描述】:

我一直在使用以下脚本上传数据来对我的模块进行负载测试:

import json
import ast
import pandas as pd
import sys
import cloudant_connection as cloud

df = pd.read_csv("./deviceId_data/device_starts_"+ sys.argv[1] + ".csv")
print(" checkpoint 1 cleared ")
def push_data_to_cloudant(ID,Key,Value,Database):
    Value = ast.literal_eval(Value)
    temp_doc = {}
    temp_doc["_id"] = ID
    temp_doc["value"] = Value["value"]
    temp_doc["devId"] = Value["devId"]
    temp_doc["eDateTime"] = Key[0]
    temp_doc["eDate"] = Value["eDate"]
    temp_doc["cDateTime"] = Key[0]
    temp_doc["cDate"] = Value["cDate"]
    
    new_doc = Database.create_document(temp_doc)
    
    if new_doc.exists():
        #print("doc created")
        return "Success"
    else:
        print("Failed in pushing document")
        return "Failure"

with open("./connection_config_source.json") as f:
    connect_conf = json.load(f)
print(" checkpoint 2 cleared ")
API_KEY = connect_conf['cloudant_api_key']
ACC_NAME = connect_conf['cloudant_account_name']
print(" checkpoint 3 cleared ")
try:
    client = cloud.connecting_to_cloudant_via_api(ACC_NAME,API_KEY)

    database_name = 'DB_NAME'
    
    Database = client[database_name]
    print(" checkpoint 4 cleared ")
    if Database.exists():
        print("Connected")
      
    status = [push_data_to_cloudant(ID,Key,Value,Database) for (ID,Key,Value) in zip(df['id'],df['key'],df['value'])]
    print(" last checkpoint cleared ")
except Exception as e:
    print("Failed:" + str(e))

我知道有比使用列表推导更快的方法。但我不知道如何在这种情况下使用它们。

我知道 df.apply() 比这更快,但我想知道我是否可以在这个用例中使用 Pandas Vectorization 或 Numpy Vectorization。

【问题讨论】:

  • Numpy 向量化和列表解析对于 cpu-bound 进程来说非常快。但是,您正在写入数据库,并且您是否使用结果列表是有争议的。这是一个 I/O 绑定过程,列表创建本身不是瓶颈。我会说这是对副作用使用理解语法
  • 那么如何将 23GB 的文件传输到 cloudant?任何想法我将不胜感激。顺便说一句,我已经想到了多线程和多处理。这段代码需要 2 天时间才能将 1.5GB 传输到 cloudant,我认为它很慢。
  • 我同意,这很慢。也许this answer 是您要找的?

标签: python pandas python-cloudant


【解决方案1】:

python-cloudant 文档:

bulk_docs(docs) 通过单个请求执行多个文档插入和/或更新。每个文档必须是或扩展一个字典,就像 Document 和 DesignDocument 对象一样。如果要更新文档,则文档必须包含 _id 和 _rev 字段。

参数:docs (list) – 要创建/更新的文档列表。 返回:JSON 格式的批量文档创建/更新状态

只需使用:

Database = client['DB_name']

Database.bulk_docs(*argv)

这里的参数可以是字典列表或者json对象。

【讨论】: