【问题标题】:Bulk-insertion into couchbase via python通过python批量插入沙发底座
【发布时间】:2017-01-31 21:16:10
【问题描述】:

我正在尝试在沙发底座中进行一些批量插入。我试图通过 SO 和 google 搜索示例,但我找不到任何线索。这里有人提到这是不可能的。

How to insert a documents in bulk in Couchbase?

但我想这个问题是 3 年前提出的。我搜索,如果我从下面给定的链接中正确理解,则可以批量插入文档。

https://developer.couchbase.com/documentation/server/current/sdk/batching-operations.html

https://pythonhosted.org/couchbase/api/couchbase.html#batch-operation-pipeline

这是我想在 couchbase 中实现批量插入的代码

import time
import csv
from couchbase import Couchbase
from couchbase.bucket import Bucket
from couchbase.exceptions import CouchbaseError
c = Bucket('couchbase://localhost/bulk-load')
from couchbase.exceptions import CouchbaseTransientError
BYTES_PER_BATCH = 1024 * 256 # 256K

with open('/home/royshah/Desktop/bulk_try/roy.csv') as csvfile:
    lines = csvfile.readlines()[4:]
for k, line in enumerate(lines):
    data_tmp = line.strip().split(',')
    strDate = data_tmp[0].replace("\"", "")
    timerecord = datetime.datetime.strptime(strDate,
                                           '%Y-%m-%d %H:%M:%S.%f')
    microsecs = timerecord.microsecond
    strDate = "\"" + strDate + "\""
    ts = calendar.timegm(timerecord.timetuple())*1000000 + microsecs
    datastore = [ts] + data_tmp[1:]

    stre = {'col1 ': datastore[1],  # I am making key-values on the fly from csv file
            'col2': datastore[2],
            'col3': datastore[3],
            'col4': datastore[4],
            'col5': datastore[5],
            'col6': datastore[6]}
  cb.upsert(str(datastore[0]), (stre))    # datastore[0] is used as document
                                      id and (stre) is used as key-value to be
                                      inserted for respective id. 

cb.upsert(str(datastore[0]), (str)) 正在做单次插入,我想让它批量插入以使其更快。我不知道如何在沙发底座中批量插入。我找到了这个例子,但不确定如何实现。

https://developer.couchbase.com/documentation/server/current/sdk/batching-operations.html

如果有人指出一些在 couchbase 中批量加载的示例,或者帮助我弄清楚如何通过我的代码进行批量插入。我真的很感激。 .thanx 非常感谢任何想法或帮助。

【问题讨论】:

  • 您提供的最后一个链接底部的示例有什么问题?似乎正是您想要的。
  • @RobinEllerkmann 是的,这个例子适合我,但我无法在上面的代码中实现。我尝试了不同的方式,但我是 python 新手,因此我的实现目前非常薄弱。我将更新我的问题,即我如何尝试实现此示例。谢谢你的帮助。
  • 我有一个类似的问题,可能有用也可能没用:stackoverflow.com/questions/32866825/…
  • @Tommy 感谢您的帮助。我会看看,看看我能不能实现它。我在最后一个链接中找到了一个解决方案示例,但我的实现非常薄弱,以至于我无法做到。 .

标签: python json csv couchbase


【解决方案1】:

我尝试将docs 中的示例改编为您的用例。您可能需要更改一两个细节,但您应该明白这一点。

c = Bucket('couchbase://localhost/bulk-load')
from couchbase.exceptions import CouchbaseTransientError
BYTES_PER_BATCH = 1024 * 256 # 256K

batches = []
cur_batch = {}
cur_size = 0
batches.append(cur_batch)

with open('/home/royshah/Desktop/bulk_try/roy.csv') as csvfile:
    lines = csvfile.readlines()[4:]
for key, line in enumerate(lines):
    #Format your data
    data_tmp = line.strip().split(',')
    strDate = data_tmp[0].replace("\"", "")
    timerecord = datetime.datetime.strptime(strDate,
                                           '%Y-%m-%d %H:%M:%S.%f')
    microsecs = timerecord.microsecond
    strDate = "\"" + strDate + "\""
    timestamp = calendar.timegm(timerecord.timetuple())*1000000 + microsecs

    #Build kv
    datastore = [ts] + data_tmp[1:]
    value = {'col1 ': datastore[1],  # I am making key-values on the fly from csv file
            'col2': datastore[2],
            'col3': datastore[3],
            'col4': datastore[4],
            'col5': datastore[5],
            'col6': datastore[6]}

    key = str(datastore[0]
    cur_batch[key] = value
    cur_size += len(key) + len(value) + 24

    if cur_size > BYTES_PER_BATCH:
        cur_batch = {}
        batches.append(cur_batch)
        cur_size = 0

print "Have {} batches".format(len(batches))
num_completed = 0
while batches:
  batch = batches[-1]
  try:
      cb.upsert_multi(batch)
      num_completed += len(batch)
      batches.pop()
  except CouchbaseTransientError as e:
      print e
      ok, fail = e.split_results()
      new_batch = {}
      for key in fail:
          new_batch[key] = all_data[key]
      batches.pop()
      batches.append(new_batch)
      num_completed += len(ok)
      print "Retrying {}/{} items".format(len(new_batch), len(ok))

【讨论】:

  • 我有一个小问题。在最后一行,最重要的是代码中提到了 all_data,但它没有被使用或定义。我还查看了我的链接的示例代码,但我无法理解。我删除它,代码仍然正常工作。为此,我也删除了这两行。 ......对于失败的关键: new_batch[key] = all_data[key] 。 .这是正确的方法吗?
  • 我对这件事有点困惑。因为我很确定它正在做批量插入,但是如果你能帮助我消除这种困惑,我将非常感激。
  • 我接受您的回答,因为只需进行少量更改,我就能实现批量插入的目标。感谢您宝贵的时间和帮助。我删除了一些部分和这个变量 all_data 以使其可以根据我的需要工作。 .
  • 对不起,最后一行不应该在那里,我会删除它。我很高兴你有这个想法,我能够帮助你。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多