【问题标题】:Update DynamoDB Atomic Counter with Python / Boto使用 Python / Boto 更新 DynamoDB 原子计数器
【发布时间】:2012-05-07 06:02:06
【问题描述】:

我正在尝试使用 Python Boto 2.3.0 更新原子计数计数器,但找不到该操作的文档。

似乎没有直接的接口,所以我尝试使用layer1接口进行“原始”更新,但即使是简单的更新也无法完成。

我尝试了以下变体,但都没有运气

dynoConn.update_item(INFLUENCER_DATA_TABLE, 
                     {'HashKeyElement': "9f08b4f5-d25a-4950-a948-0381c34aed1c"}, 
                     {'new': {'Value': {'N':"1"}, 'Action': "ADD"}})    

dynoConn.update_item('influencer_data', 
                     {'HashKeyElement': "9f08b4f5-d25a-4950-a948-0381c34aed1c"}, 
                     {'new': {'S' :'hello'}})                                 

dynoConn.update_item("influencer_data", 
                     {"HashKeyElement": "9f08b4f5-d25a-4950-a948-0381c34aed1c"},
                     {"AttributesToPut" : {"new": {"S" :"hello"}}})      

它们都产生相同的错误:

  File "/usr/local/lib/python2.6/dist-packages/boto-2.3.0-py2.6.egg/boto/dynamodb/layer1.py", line 164, in _retry_handler
    data)
boto.exception.DynamoDBResponseError: DynamoDBResponseError: 400 Bad Request
{u'Message': u'Expected null', u'__type': u'com.amazon.coral.service#SerializationException'}

我还研究了 API 文档 here,但它们非常简陋。

我已经做了很多搜索和摆弄,我唯一剩下的就是使用 PHP API 并深入研究代码以找到它“格式化”JSON 主体的位置,但这有点痛苦.请救我脱离痛苦!

【问题讨论】:

    标签: python counter atomic boto amazon-dynamodb


    【解决方案1】:

    DynamoDB 中没有用于原子计数器的高级函数。但是,您可以使用条件写入功能实现原子计数器。例如,假设您的表具有这样的字符串哈希键。

    >>> import boto
    >>> c = boto.connect_dynamodb()
    >>> schema = s.create_schema('id', 's')
    >>> counter_table = c.create_table('counter', schema, 5, 5)
    

    您现在将一个项目写入该表,其中包含一个名为“n”的属性,其值为 0。

    >>> n = 0
    >>> item = counter_table.new_item('counter', {'n': n})
    >>> item.put()
    

    现在,如果我想更新我的计数器的值,我将执行一个条件写入操作,如果它的当前值与我对它的当前值的想法一致,我将把“n”的值增加到 1。

    >>> n += 1
    >>> item['n'] = n
    >>> item.put(expected_value={'n': n-1})
    

    这会将项目中的“n”值设置为 1,但前提是 DynamoDB 中的当前值为零。如果该值已被其他人递增,则写入将失败,然后我需要按本地计数器递增并重试。

    这有点复杂,但所有这些都可以包含在一些代码中,以使其更易于使用。我为 SimpleDB 做了类似的事情,你可以在这里找到:

    http://www.elastician.com/2010/02/stupid-boto-tricks-2-reliable-counters.html

    我可能应该尝试更新该示例以使用 DynamoDB

    【讨论】:

    • Gamaat,非常感谢您的回复,但这并不是真正的原子计数器。这是一个标准的更新操作,并且与之相关的成本非常高。费用支付给客户“确保”原子,原子计数器“免费”提供此功能。
    • DynamoDB 实际上确实有原子计数器——它们只需要通过较低级别的 API 访问。
    【解决方案2】:

    对于那些寻找答案的人,我已经找到了。 首先要注意的是,我目前不知道发生了什么,但目前,要获得 layer1 实例,我必须执行以下操作:

    import boto
    AWS_ACCESS_KEY=XXXXX
    AWS_SECRET_KEY=YYYYY
    dynoConn = boto.connect_dynamodb(AWS_ACCESS_KEY, AWS_SECRET_KEY)
    dynoConnLayer1 = boto.dynamodb.layer1.Layer1(AWS_ACCESS_KEY, AWS_SECRET_KEY) 
    

    本质上是先实例化第 2 层,然后再实例化第 1 层。 也许我在做一些愚蠢的事情,但在这一点上我很高兴让它工作...... 稍后我会整理细节。那么...实际执行原子更新调用:

    dynoConnLayer1.update_item("influencer_data", 
                        {"HashKeyElement":{"S":"9f08b4f5-d25a-4950-a948-0381c34aed1c"}},
                        {"direct_influence":
                            {"Action":"ADD","Value":{"N":"20"}}
                        }
                    );
    

    请注意,在上面的示例中,Dynamo 会将当前值加 20,并且此操作将是原子操作,这意味着在“同时”发生的其他操作将被正确“调度”在新值建立后发生在执行此操作之前为 +20 OR。无论哪种方式都会达到预期的效果。

    一定要在 layer1 连接的实例上执行此操作,因为 layer2 会抛出错误,因为它需要一组不同的参数类型。

    这就是它的全部!!!!众所周知,我使用 PHP SDK 解决了这个问题。安装和设置需要很短的时间,然后当您进行调用时,调试数据实际上会向您显示 HTTP 请求正文的格式,因此您将能够在示例之后复制/建模您的 layer1 参数。这是我用来在 PHP 中进行原子更新的代码:

    <?php 
        // Instantiate the class
        $dynamodb = new AmazonDynamoDB();
    
        $update_response = $dynamodb->update_item(array(
            'TableName' => 'influencer_data',
                'Key' => array(
                    'HashKeyElement' => array(
                        AmazonDynamoDB::TYPE_STRING=> '9f08b4f5-d25a-4950-a948-0381c34aed1c'
                    )
                ),
                'AttributeUpdates' => array(
                    'direct_influence' => array(
                        'Action' => AmazonDynamoDB::ACTION_ADD,
                        'Value' => array(
                            AmazonDynamoDB::TYPE_NUMBER => '20'
                        )
                    )
                )
        ));
    
        // status code 200 indicates success
        print_r($update_response);
    
    ?>
    

    希望这将有助于其他人,直到 Boto layer2 接口赶上......或者有人只是想出如何在 level2 中做到这一点:-)

    【讨论】:

      【解决方案3】:

      抱歉,我误解了您要查找的内容。尽管有一个小错误需要解决,但您可以通过 layer2 完成此操作。这是一些 Layer2 代码:

      >>> import boto
      >>> c = boto.connect_dynamodb()
      >>> t = c.get_table('counter')
      >>> item = t.get_item('counter')
      >>> item
      {u'id': 'counter', u'n': 1}
      >>> item.add_attribute('n', 20)
      >>> item.save()
      {u'ConsumedCapacityUnits': 1.0}
      >>> item  # Here's the bug, local Item is not updated
      {u'id': 'counter', u'n': 1}
      >>> item = t.get_item('counter')  # Refetch item just to verify change occurred
      >>> item
      {u'id': 'counter', u'n': 21}
      

      这会产生与您在第 1 层代码中执行的相同的在线请求,如以下调试输出所示。

      2012-04-27 04:17:59,170 foo [DEBUG]:StringToSign:
      POST
      /
      
      host:dynamodb.us-east-1.amazonaws.com
      x-amz-date:Fri, 27 Apr 2012 11:17:59 GMT
      x-amz-security-    token:<removed> ==
      x-amz-target:DynamoDB_20111205.UpdateItem
      
      {"AttributeUpdates": {"n": {"Action": "ADD", "Value": {"N": "20"}}}, "TableName": "counter", "Key": {"HashKeyElement": {"S": "counter"}}}
      

      如果你想避免最初的 GetItem 调用,你可以这样做:

      >>> import boto
      >>> c = boto.connect_dynamodb()
      >>> t = c.get_table('counter')
      >>> item = t.new_item('counter')
      >>> item.add_attribute('n', 20)
      >>> item.save()
      {u'ConsumedCapacityUnits': 1.0}
      

      如果项目已存在则更新该项目,如果项目尚不存在则创建它。

      【讨论】:

      • gamaat,谢谢!我看到了这些操作,但不认为他们会以这种方式操作。再次感谢!
      • 当多线程同时做这个操作时,由于 save() 方法中的expect 选项,它可能不起作用。
      【解决方案4】:

      我不确定这是否真的是一个原子计数器,因为当您增加 1 的值时,另一个调用调用可能会将该数字增加 1,因此当您“获取”该值时,它不是你会期待的。

      比如,把代码放在garnaat上,标记为接受的答案,我看到你把它放在一个线程中,它不起作用:

      class ThreadClass(threading.Thread):
          def run(self):
              conn = boto.dynamodb.connect_to_region(aws_access_key_id=os.environ['AWS_ACCESS_KEY'], aws_secret_access_key=os.environ['AWS_SECRET_KEY'], region_name='us-east-1')
              t = conn.get_table('zoo_keeper_ids')
              item = t.new_item('counter')
              item.add_attribute('n', 1)
              r = item.save() #- Item has been atomically updated!
              # Uh-Oh! The value may have changed by the time "get_item" is called!
              item = t.get_item('counter') 
              self.counter = item['n']
              logging.critical('Thread has counter: ' + str(self.counter))
      
      tcount = 3
      threads = []
      for i in range(tcount):
          threads.append(ThreadClass())
      
      # Start running the threads:
      for t in threads:
          t.start()
      
      # Wait for all threads to complete:
      for t in threads:
          t.join()
      
      #- Now verify all threads have unique numbers:
      results = set()
      for t in threads:
          results.add(t.counter)
      
      print len(results)
      print tcount
      if len(results) != tcount:
          print '***Error: All threads do not have unique values!'
      else:
          print 'Success!  All threads have unique values!'
      

      注意:如果您希望它真正起作用,请将代码更改为:

      def run(self):
          conn = boto.dynamodb.connect_to_region(aws_access_key_id=os.environ['AWS_ACCESS_KEY'], aws_secret_access_key=os.environ['AWS_SECRET_KEY'], region_name='us-east-1')
          t = conn.get_table('zoo_keeper_ids')
          item = t.new_item('counter')
          item.add_attribute('n', 1)
          r = item.save(return_values='ALL_NEW') #- Item has been atomically updated, and you have the correct value without having to do a "get"!
          self.counter = str(r['Attributes']['n'])
          logging.critical('Thread has counter: ' + str(self.counter))
      

      希望这会有所帮助!

      【讨论】:

      • 对不起,我发布了为什么它不发布解决方案就无法工作。我已经修改了我的原始帖子以包含解决方案。
      【解决方案5】:

      您想在 dynamodb 中增加一个值,然后您可以通过以下方式实现:

      import boto3
      import json
      import decimal
      
      class DecimalEncoder(json.JSONEncoder):
          def default(self, o):
              if isinstance(o, decimal.Decimal):
                  if o % 1 > 0:
                      return float(o)
                  else:
                      return int(o)
              return super(DecimalEncoder, self).default(o)
      
      ddb = boto3.resource('dynamodb') 
      def get_counter():
          table = ddb.Table(TableName)
          try:
                  response = table.update_item(                                                             
                  Key={
                      'haskey' : 'counterName'
                  },
                  UpdateExpression="set currentValue = currentValue +  :val",
                  ExpressionAttributeValues={
                      ':val': decimal.Decimal(1)
                  }, 
                  ReturnValues="UPDATED_NEW"
              )
              print("UpdateItem succeeded:")
          except Exception as e:
              raise e
          print(response["Attributes"]["currentValue" ])
      

      这个实现需要一个额外的计数器表来为你保留最后使用的值。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-02-21
        • 2016-04-25
        • 2012-03-11
        • 1970-01-01
        • 1970-01-01
        • 2014-05-02
        • 2015-07-08
        • 2013-09-28
        相关资源
        最近更新 更多