【问题标题】:Django bulk_create with ignore rows that cause IntegrityError?Django bulk_create 忽略导致 IntegrityError 的行?
【发布时间】:2012-09-09 04:26:18
【问题描述】:

我正在使用 bulk_create 将数千行或行加载到 postgresql 数据库中。不幸的是,有些行导致 IntegrityError 并停止了 bulk_create 进程。我想知道是否有办法告诉 django 忽略这些行并尽可能多地保存批处理?

【问题讨论】:

  • 这可能是不可能的,因为 PostgreSQL 在第一个错误时中止事务。 Django 需要(a)在每次插入之前创建一个 SAVEPOINT,这会减慢速度并消耗资源;或 (b) 仅当行不存在时才使用过程或查询插入。就个人而言,我会批量插入到一个新的单独表中,可能是UNLOGGEDTEMPORARY,然后是INSERT INTO realtable SELECT * FROM temptable WHERE NOT EXISTS (SELECT 1 FROM realtable WHERE temptable.id = realtable.id) 或类似的。
  • @CraigRinger 好主意,但如果你有更大的模型,那就有点工作

标签: django postgresql bulk-load


【解决方案1】:

这对我有用
我在线程中使用这个函数。
我的 csv 文件包含 120907 行。

def products_create():
    full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
    filename = os.listdir(full_path)[0]
    logger.debug(filename)
    logger.debug(len(Product.objects.all()))
    if len(Product.objects.all()) > 0:
        logger.debug("Products Data Erasing")
        Product.objects.all().delete()
        logger.debug("Products Erasing Done")

    csvfile = os.path.join(full_path,filename)
    csv_df = pd.read_csv(csvfile,sep=',')
    csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
    row_iter = csv_df.iterrows()
    logger.debug(row_iter)

    logger.debug("New Products Creating")

    for index, row in row_iter:
        Product.objects.create(part_number = row[0],
                               part_description = row[1],
                               mrp = row[2],
                               hsn_code = row[3],
                               gst = row[4],
                               )

    # products_list = [
    #           Product(
    #               part_number = row[0] ,
    #               part_description = row[1],
    #               mrp = row[2],
    #               hsn_code = row[3],
    #               gst = row[4],
    #           )
    #           for index, row in row_iter

    #       ]
    # logger.debug(products_list)

    # Product.objects.bulk_create(products_list)
    logger.debug("Products uploading done")```

【讨论】:

    【解决方案2】:

    Django 2.2 之前的项目的最新答案:

    我最近遇到了这种情况,我找到了一个辅助列表数组来检查唯一性的方法。

    在我的情况下,模型具有唯一的共同检查,并且由于批量创建的数组中有重复数据,因此批量创建会引发完整性错误异常。

    所以我决定在批量创建对象列表之外创建清单。这是示例代码;唯一键是 ownerbrand,在这个例子中,owner 是一个用户对象实例,brand 是一个字符串实例:

    create_list = []
    create_list_check = []
    for brand in brands:
        if (owner.id, brand) not in create_list_check:
            create_list_check.append((owner.id, brand))
            create_list.append(ProductBrand(owner=owner, name=brand))
    
    if create_list:
        ProductBrand.objects.bulk_create(create_list)
    

    【讨论】:

      【解决方案3】:

      这在 Django 2.2 上现在是可能的

      Django 2.2 为bulk_create 方法添加了一个新的ignore_conflicts 选项,来自documentation

      在支持它的数据库上(除 PostgreSQL

      例子:

      Entry.objects.bulk_create([
          Entry(headline='This is a test'),
          Entry(headline='This is only a test'),
      ], ignore_conflicts=True)
      

      【讨论】:

      • 看起来,如果打开该选项,几乎没有性能影响。干得好姜戈!并感谢您的回答...
      • 可悲的是,创建的模型上不会有PK:(
      • @int_32 你在使用特殊的PK字段吗?至少在默认情况下,PK 可以很好地创建,除了返回的对象仍然有 None 作为它们的 PK,但在数据库中它很好。
      • @HenryWoody 正确,我的意思是返回的对象没有 PK,在 DB 中当然可以。
      • 当使用这种技术时,你是否得到任何关于哪些行无法插入的返回信息?
      【解决方案4】:

      即使在 Django 1.11 中也无法做到这一点。我找到了比使用 Raw SQL 更好的选择。它使用djnago-query-builder。它有一个upsert 方法

      from querybuilder.query import Query
      q = Query().from_table(YourModel)
      # replace with your real objects
      rows = [YourModel() for i in range(10)] 
      q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])
      

      注意:该库仅支持 postgreSQL

      这是我用于批量插入的gist,它支持忽略 IntegrityErrors 并返回插入的记录。

      【讨论】:

        【解决方案5】:

        一个不涉及手动 SQL 和临时表的快速而简单的解决方法是尝试批量插入数据。如果失败,则恢复串行插入。

        objs = [(Event), (Event), (Event)...]
        
        try:
            Event.objects.bulk_create(objs)
        
        except IntegrityError:
            for obj in objs:
                try:
                    obj.save()
                except IntegrityError:
                    continue
        

        如果您有很多错误,这可能不是那么有效(您将花费更多时间连续插入而不是批量插入),但我正在处理一个重复很少的高基数数据集,所以这解决了我的大部分问题。

        【讨论】:

        • 在我个人看来,这确实是最好的,因为它允许您捕获最终作为“忽略冲突”的一部分而遗漏的错误。
        【解决方案6】:

        或者 5. 分而治之

        我没有对此进行彻底的测试或基准测试,但它对我来说表现相当不错。 YMMV,具体取决于您预计在批量操作中会出现多少错误。

        def psql_copy(records):
            count = len(records)
            if count < 1:
                return True
            try:
                pg.copy_bin_values(records)
                return True
            except IntegrityError:
                if count == 1:
                    # found culprit!
                    msg = "Integrity error copying record:\n%r"
                    logger.error(msg % records[0], exc_info=True)
                    return False
            finally:
                connection.commit()
        
            # There was an integrity error but we had more than one record.
            # Divide and conquer.
            mid = count / 2
            return psql_copy(records[:mid]) and psql_copy(records[mid:])
            # or just return False
        

        【讨论】:

          【解决方案7】:

          (注意:我不使用Django,所以可能有更合适的框架特定的答案)

          Django 不可能通过简单地忽略 INSERT 失败来做到这一点,因为 PostgreSQL 在第一个错误时中止整个事务。

          Django 需要以下方法之一:

          1. INSERT 每一行在一个单独的事务中并忽略错误(非常慢);
          2. 在每次插入之前创建一个SAVEPOINT(可能存在缩放问题);
          3. 仅当行不存在时才使用过程或查询插入(复杂且缓慢);或
          4. 将数据批量插入或(更好)COPYTEMPORARY 表中,然后将其合并到服务器端的主表中。

          类似 upsert 的方法 (3) 似乎是个好主意,但 upsert and insert-if-not-exists are surprisingly complicated

          就我个人而言,我会采取 (4):我会批量插入到一个新的单独表中,可能是 UNLOGGEDTEMPORARY,然后我会运行一些手动 SQL 来:

          LOCK TABLE realtable IN EXCLUSIVE MODE;
          
          INSERT INTO realtable 
          SELECT * FROM temptable WHERE NOT EXISTS (
              SELECT 1 FROM realtable WHERE temptable.id = realtable.id
          );
          

          LOCK TABLE ... IN EXCLUSIVE MODE 防止创建行的并发插入导致与上述语句完成的插入发生冲突并失败。它确实阻止并发SELECTs,只有SELECT ... FOR UPDATEINSERTUPDATEDELETE,所以从表中读取正常进行。

          如果您无法长时间阻止并发写入,您可以改为使用可写 CTE 将行范围从 temptable 复制到 realtable,如果失败则重试每个块。

          【讨论】:

          • 谢谢@craig-ringer 我最终在将它们插入数据库之前清除了我的python对象列表,类似于你的方法#3,但在纯python中。
          • rodmtech.net/docs/django/…有(4)的详细例子
          猜你喜欢
          • 1970-01-01
          • 2015-05-18
          • 1970-01-01
          • 1970-01-01
          • 2020-04-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-09-16
          相关资源
          最近更新 更多