【问题标题】:Getting Http Response from boto3 table.batch_writer object从 boto3 table.batch_writer 对象获取 Http 响应
【发布时间】:2019-08-12 15:54:42
【问题描述】:

我想将 csv 中的数据列表放入 aws 上的 dynamodb 表中。请参阅下面的示例列表。

    Mary,F,7065
    Anna,F,2604
    Emma,F,2003
    Elizabeth,F,1939
    Minnie,F,1746
    Margaret,F,1578
    Ida,F,1472
    Alice,F,1414
    Bertha,F,1320
    Sarah,F,1288
    Annie,F,1258
    Clara,F,1226
    Ella,F,1156
    Florence,F,1063
    Cora,F,1045
    Martha,F,1040
    Laura,F,1012
    Nellie,F,995
    Grace,F,982
    Carrie,F,949
    Maude,F,858
    Mabel,F,808
    Bessie,F,796
    Jennie,F,793
    Gertrude,F,787
    Julia,F,783
    Hattie,F,769
    Edith,F,768
    Mattie,F,704
    Rose,F,700
    Catherine,F,688
    Lillian,F,672
    Ada,F,652
    Lillie,F,647
    Helen,F,636
    Jessie,F,635
    Louise,F,635
    Ethel,F,633
    Lula,F,621
    Myrtle,F,615
    Eva,F,614
    Frances,F,605
    Lena,F,603
    Lucy,F,590
    Edna,F,588
    Maggie,F,582
    Pearl,F,569
    Daisy,F,564
    Fannie,F,560
    Josephine,F,544

为了将超过 25 个项目写入 dynamodb 表,文档使用 batch_writer 对象。

    resource = boto3.resource('dynamodb')
    table = resource.Table('Names')
    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(item)

有没有办法返回一个http响应来表明batch_write的成功完成?我知道这是异步的。是否有等待或获取或调用什么?

【问题讨论】:

    标签: python-3.x amazon-web-services amazon-dynamodb boto3


    【解决方案1】:

    batch_writer 实例化的 BatchWriter 对象的文档位于(here。查看 BatchWriter 类,_flush 方法会生成一个响应,它只是没有将它存储在任何地方。

    class BatchWriter(object):
        """Automatically handle batch writes to DynamoDB for a single table."""
        def __init__(self, table_name, client, flush_amount=25,
                     overwrite_by_pkeys=None):
            """
            :type table_name: str
            :param table_name: The name of the table.  The class handles
                batch writes to a single table.
            :type client: ``botocore.client.Client``
            :param client: A botocore client.  Note this client
                **must** have the dynamodb customizations applied
                to it for transforming AttributeValues into the
                wire protocol.  What this means in practice is that
                you need to use a client that comes from a DynamoDB
                resource if you're going to instantiate this class
                directly, i.e
                ``boto3.resource('dynamodb').Table('foo').meta.client``.
            :type flush_amount: int
            :param flush_amount: The number of items to keep in
                a local buffer before sending a batch_write_item
                request to DynamoDB.
            :type overwrite_by_pkeys: list(string)
            :param overwrite_by_pkeys: De-duplicate request items in buffer
                if match new request item on specified primary keys. i.e
                ``["partition_key1", "sort_key2", "sort_key3"]``
            """
            self._table_name = table_name
            self._client = client
            self._items_buffer = []
            self._flush_amount = flush_amount
            self._overwrite_by_pkeys = overwrite_by_pkeys
    
        def put_item(self, Item):
            self._add_request_and_process({'PutRequest': {'Item': Item}})
    
        def delete_item(self, Key):
            self._add_request_and_process({'DeleteRequest': {'Key': Key}})
    
        def _add_request_and_process(self, request):
            if self._overwrite_by_pkeys:
                self._remove_dup_pkeys_request_if_any(request)
            self._items_buffer.append(request)
            self._flush_if_needed()
    
        def _remove_dup_pkeys_request_if_any(self, request):
            pkey_values_new = self._extract_pkey_values(request)
            for item in self._items_buffer:
                if self._extract_pkey_values(item) == pkey_values_new:
                    self._items_buffer.remove(item)
                    logger.debug("With overwrite_by_pkeys enabled, skipping "
                                 "request:%s", item)
    
        def _extract_pkey_values(self, request):
            if request.get('PutRequest'):
                return [request['PutRequest']['Item'][key]
                        for key in self._overwrite_by_pkeys]
            elif request.get('DeleteRequest'):
                return [request['DeleteRequest']['Key'][key]
                        for key in self._overwrite_by_pkeys]
            return None
    
        def _flush_if_needed(self):
            if len(self._items_buffer) >= self._flush_amount:
                self._flush()
    
        def _flush(self):
            items_to_send = self._items_buffer[:self._flush_amount]
            self._items_buffer = self._items_buffer[self._flush_amount:]
            response = self._client.batch_write_item(
                RequestItems={self._table_name: items_to_send})
            unprocessed_items = response['UnprocessedItems']
    
            if unprocessed_items and unprocessed_items[self._table_name]:
                # Any unprocessed_items are immediately added to the
                # next batch we send.
                self._items_buffer.extend(unprocessed_items[self._table_name])
            else:
                self._items_buffer = []
            logger.debug("Batch write sent %s, unprocessed: %s",
                         len(items_to_send), len(self._items_buffer))
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_value, tb):
            # When we exit, we need to keep flushing whatever's left
            # until there's nothing left in our items buffer.
            while self._items_buffer:
                self._flush()
    

    我是如何解决的:

    我建立在对this question 的关于覆盖类方法的响应之上。它们都可以工作,但最适合我的用例是用这个版本的 _flush 覆盖类实例。

    首先我构建了一个新版本的 _flush。

    import logging
    import types
    
    ## New Flush
    
    def _flush(self):
        items_to_send = self._items_buffer[:self._flush_amount]
        self._items_buffer = self._items_buffer[self._flush_amount:]
        self._response = self._client.batch_write_item(
            RequestItems={self._table_name: items_to_send})
        unprocessed_items = self._response['UnprocessedItems']
    
        if unprocessed_items and unprocessed_items[self._table_name]:
            # Any unprocessed_items are immediately added to the
            # next batch we send.
            self._items_buffer.extend(unprocessed_items[self._table_name])
        else:
            self._items_buffer = []
        logger.debug("Batch write sent %s, unprocessed: %s",
                     len(items_to_send), len(self._items_buffer))
    
    
    

    然后我像这样覆盖了实例方法。

    with batch_writer() as batch:
        batch._flush=types.MethodType(_flush, batch)
        for item in items:
            batch.put_item(Item=item)
    print(batch._response)
    

    这会产生这样的输出。

    {'UnprocessedItems': {},
     'ResponseMetadata': {'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
      'HTTPStatusCode': 200,
      'HTTPHeaders': {'server': 'Server',
       'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
       'content-type': 'application/x-amz-json-1.0',
       'content-length': '23',
       'connection': 'keep-alive',
       'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
       'x-amz-crc32': '4185382645'},
      'RetryAttempts': 0}}
    

    【讨论】:

    • 这是一个非常有趣的方法覆盖——我没有意识到你可以注入这样的方法。感谢分享!是否有计划打开 PR 以将此行为合并到 boto3 代码库中以替换 _flush() 方法?非常高兴有回应。
    • 看,如果你有时间并且想抄袭我的工作并发布一个 PR 到 boto3,去吧。这将使这个问题,我最流行的问题/答案过时......所以你可能会提到我。
    【解决方案2】:

    似乎没有任何内置方法可以做到这一点。不过,BatchWriter 上的 _flush 方法会在完成批处理时记录一条调试消息。如果您只是想看看发生了什么,您可以在 put_item 循环之前启用调试日志记录:

    import logging
    logger = logging.getLogger('boto3.dynamodb.table')
    logger.setLevel(logging.DEBUG)
    

    如果您想采取一些行动,您可以创建一个自定义 logging.Handler,如下所示:

    import logging
    import sys
    
    class CatchBatchWrites(logging.Handler):
        def handle(self, record):
            if record.msg.startswith('Batch write sent'):
                processed, unprocessed = record.args
                # do something with these numbers
    
    
    logger = logging.getLogger('boto3.dynamodb.table')
    logger.setLevel(logging.DEBUG) # still necessary
    logger.addHandler(CatchBatchWrites())
    

    【讨论】:

    • 让我知道这是否必须是一个单独的问题,但是 batch._client.describe_endpoints 函数有什么作用?它有一个http响应,但我不知道它指的是什么,也不知道它是否与上述情况有关。
    • 我认为这无关紧要,但我不确定它的作用!是的 - 如果您确实想了解更多信息,我建议您将其作为一个单独的问题。
    • 好的。提出了一个新问题here,@nathan-vērzemnieks
    • 我正在尝试实施您的解决方案,但收到此值错误。 ValueError: not enough values to unpack (expected 2, got 0)
    • 运行日志记录提供Batch write sent %s, unprocessed: %s的record.msg。
    猜你喜欢
    • 2013-08-07
    • 1970-01-01
    • 2022-06-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-30
    • 2017-06-16
    相关资源
    最近更新 更多