【问题标题】:How to fetch/scan all items from `AWS dynamodb` using node.js如何使用 node.js 从`AWS dynamodb` 获取/扫描所有项目
【发布时间】:2017-11-19 06:34:00
【问题描述】:

如何使用node.jsAWS dynamodb 获取/扫描所有项目。我在这里发布我的代码。

var docClient = new aws.DynamoDB.DocumentClient();
    var params = {
    TableName:"users",
    KeyConditionExpression:"user_status=:status",
    ExpressionAttributeValues: {
        ":status": "Y"
    }
    };

    var queryExecute = function(callback) {
        docClient.query(params,function(err,result) {
            if(err) {
                console.log(err)
                callback(err);
                } else {
                console.log(result);

                if(result.LastEvaluatedKey) {
                    params.ExclusiveStartKey = result.LastEvaluatedKey;
                    queryExecute(callback);
                    } else {
                        callback(err,items);
                    }
                }
            });
        }
        queryExecute(callback); 

这给了我以下错误。

ValidationException: Query condition missed key schema element: `user_id`.

这里的主键是user_id。我不想在我的查询条件中使用它,因为如果我在KeyConditionExpression 中提到主键,我需要设置一个值。可能是我错了。但是请建议我一个从dynamodb 获取所有项目的好方法,它有user_status = "Y"

【问题讨论】:

标签: node.js amazon-dynamodb aws-lambda


【解决方案1】:

这对我有用:

export const scanTable = async (tableName) => {
    const params = {
        TableName: tableName,
    };

    const scanResults = [];
    const items;
    do{
        items =  await documentClient.scan(params).promise();
        items.Items.forEach((item) => scanResults.push(item));
        params.ExclusiveStartKey  = items.LastEvaluatedKey;
    }while(typeof items.LastEvaluatedKey !== "undefined");
    
    return scanResults;

};

【讨论】:

  • 这可行,但有一些最佳实践改进:将scanResults 改成const,将!= 更改为!==,将items 改成const 并在循环中声明它用于存储 LastEvaluatedKey 的 do 循环之外的单独变量。
  • 您不能在循环内分配给常量“项目”,而是改为“让”。您还可以使用更好的语法在数组中添加多个项目:scanResults.push(...items.Items),这里您还必须在推送之前检查 items.Items 是否未定义。
  • 请问params.ExclusiveStartKey = items.LastEvaluateKey 是干什么用的?
  • @Jarrett 用于对结果进行分页,并允许您跟踪下一次扫描操作的起点。在 dDB 中扫描将返回最多 1 MB 的数据,或 Limit 参数指定的记录数。如果表包含更多记录,那么您将需要获取 ExclusiveStartKey 并将其作为 LastEvaluatedKey 传递,以便下一个查询从该位置开始扫描。 docs.aws.amazon.com/amazondynamodb/latest/APIReference/…
【解决方案2】:

如果您想在不使用 Hash 键值的情况下从 DynamoDB 中获取数据,则需要使用Scan API

注意: Scan API 会读取表中的所有项目以获取结果。因此,在 DynamoDB 中这是一项代价高昂的操作。

替代方法:使用 GSI

以上场景扫码:-

var docClient = new AWS.DynamoDB.DocumentClient();

var params = {
    TableName: "users",
    FilterExpression: "#user_status = :user_status_val",
    ExpressionAttributeNames: {
        "#user_status": "user_status",
    },
    ExpressionAttributeValues: { ":user_status_val": 'somestatus' }

};

docClient.scan(params, onScan);
var count = 0;

function onScan(err, data) {
    if (err) {
        console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
    } else {        
        console.log("Scan succeeded.");
        data.Items.forEach(function(itemdata) {
           console.log("Item :", ++count,JSON.stringify(itemdata));
        });

        // continue scanning if we have more items
        if (typeof data.LastEvaluatedKey != "undefined") {
            console.log("Scanning for more...");
            params.ExclusiveStartKey = data.LastEvaluatedKey;
            docClient.scan(params, onScan);
        }
    }
}

【讨论】:

  • Scan 最多只能扫描 1MB 的数据,因此此代码不适用于大于 1MB 的数据库。
  • 代码有一个递归调用,直到 LastEvaluatedKey 未定义。所以,它应该可以工作。
  • 对不起,我错过了!
  • 虽然这似乎可行,但我认为如果这个答案实际上会返回数据,而不是仅仅将其打印到控制台,那会好得多。对于刚接触 JS 或函数式编程的人来说,从这里想出一个实际将数据(通过 callbak)返回给调用函数的解决方案并不一定容易。
  • 同意@JuhaKervinen,最好返回数据。汉克的回答做到了这一点,而且更加简洁。
【解决方案3】:

AWS 文档示例对我不起作用。 @Hank 方法成功了。

在 lambda 中使用处理程序:

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient({
    // optional tuning - 50% faster(cold) / 20% faster(hot)
    apiVersion: '2012-08-10',
    sslEnabled: false,
    paramValidation: false,
    convertResponseTypes: false
});

const tableName = 'series';

exports.handler = async (event, context, callback) => {
    let params = { TableName: tableName };

    let scanResults = [];
    let items;

    do {
        items = await docClient.scan(params).promise();
        items.Items.forEach((item) => scanResults.push(item));
        params.ExclusiveStartKey = items.LastEvaluatedKey;
    } while (typeof items.LastEvaluatedKey != "undefined");

    callback(null, scanResults);
};

【讨论】:

    【解决方案4】:

    使用 Promise 和异步

    const aws = require('aws-sdk');
    aws.config.update({ region: 'us-east-1' });
    const documentClient = new aws.DynamoDB.DocumentClient();
    
    const scanAll = async (params) => {
      let lastEvaluatedKey = 'dummy'; // string must not be empty
      const itemsAll = [];
      while (lastEvaluatedKey) {
        const data = await documentClient.scan(params).promise();
        itemsAll.push(...data.Items);
        lastEvaluatedKey = data.LastEvaluatedKey;
        if (lastEvaluatedKey) {
          params.ExclusiveStartKey = lastEvaluatedKey;
        }
      }
      return itemsAll;
    }
    

    这样使用

    const itemsAll = scanAll(params);
    

    查询的代码相同(只需将扫描替换为查询)

    【讨论】:

      【解决方案5】:

      我使用这样的承诺:

      let AWS = require('aws-sdk');
      let docClient = new AWS.DynamoDB.DocumentClient();
      
      async function dbRead(params) {
          let promise = docClient.scan(params).promise();
          let result = await promise;
          let data = result.Items;
          if (result.LastEvaluatedKey) {
              params.ExclusiveStartKey = result.LastEvaluatedKey;
              data = data.concat(await dbRead(params));
          }
          return data;
      }
      

      并使用它:

      let params = {
        TableName: 'Table'
      };
      let data = await dbRead(params);
      

      【讨论】:

      • 要了解递归,首先要了解递归。
      【解决方案6】:
      const AWS = require('aws-sdk');
      const docClient = new AWS.DynamoDB.DocumentClient({
          // optional tuning - 50% faster(cold) / 20% faster(hot)
          apiVersion: '2012-08-10',
          sslEnabled: false,
          paramValidation: false,
          convertResponseTypes: false,
          region: 'us-east-2' // put your region
      });
      const tableName = 'tableName'; // put your tablename
      
      exports.handler = async (event, context, callback) => {
          let params = { TableName: tableName };
      
          let scanResults = [];
          let items;
      
          do {
              items = await docClient.scan(params).promise();
              items.Items.forEach((item) => scanResults.push(item));
              params.ExclusiveStartKey = items.LastEvaluatedKey;
          } while (typeof items.LastEvaluatedKey != "undefined");
      
          callback(null, scanResults);
      };
      

      【讨论】:

      • 我喜欢这种方法。谢谢!
      【解决方案7】:

      以 JSON 格式返回数据的 node express 解决方案:

      let datapack=[];
      item = {
              TableName: ddbTable,
              FilterExpression: "aws = :e AND begins_with ( Id, :t )",
              ExpressionAttributeValues: {
                  ":t"    :   "contact",
                  ":e"    :   aws
              },
              ProjectionExpression: "Id,FirstName,LastName,cEmail",
          };
          docClient.scan(item, onScan);
          function onScan(err, data) {
              if (err) {
                  console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
              } else {        
                  datapack = datapack.concat(data.Items);
                  });
                  if (typeof data.LastEvaluatedKey != "undefined") {
                      item.ExclusiveStartKey = data.LastEvaluatedKey;
                      docClient.scan(item, onScan);
                  } else {
                      res.json(datapack);
                  }
              }
          }
      

      【讨论】:

        【解决方案8】:

        这是一个提供索引结果的答案,而不是使用昂贵的扫描,也是 JSON 格式的节点/快递。注意 docClient.query 的使用:

         datapack=[];
            item = {
                TableName: ddbTable,
                IndexName: "cEmailIndex",
                KeyConditionExpression : "aws = :e AND begins_with ( cEmail, :t )",
                ExpressionAttributeValues: {
                    ":t"    :   search,
                    ":e"    :   aws
                },
                ProjectionExpression: "Id,FirstName,LastName,cEmail",
            };
            docClient.query(item, onScan);
            function onScan(err, data) {
                if (err) {
                    console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
                } else {
                    datapack = datapack.concat(data.Items);
                    if (typeof data.LastEvaluatedKey != "undefined") {
                        item.ExclusiveStartKey = data.LastEvaluatedKey;
                        docClient.query(item, onScan);
                    } else {
                        // console.log(JSON.stringify(datapack));
                        res.json(datapack);
                    }
                }
            }
        

        【讨论】:

        • 注意:如果您提供了更多数据,则此示例和下一个示例仍然有效,然后单次扫描可以处理。请注意 dynamodb 内存限制。
        • 嗨,我关注了这个,我得到了结果。唯一的问题是查询中返回的 LastEvaluatedKey 对我来说总是相同的。
        【解决方案9】:

        对于不使用 AWS.DynamoDB.DocumentClient 的用户,此解决方案将有效。 我已将功能拆分为多个模块,以便于阅读并使用 async/await。

        const AWS = require("aws-sdk");
        AWS.config.update({
            // update table region here
            region: "us-west-2"
        });
        var dynamodb = new AWS.DynamoDB();
        const performAsynScanOperation = (scanParams) => {
            return new Promise((resolve, reject) => {
                dynamodb.scan(scanParams, function (err, responseData) {
                    if (err) {
                        reject(err)
                    } else {
                        resolve(responseData)
                    }
                })
            })
        }
        
        const getAllRecords = async (tableName) => {
            let allItems = [];
            let LastEvaluatedKeyFlag = true;
            let scanParams = { TableName: tableName }
            while (LastEvaluatedKeyFlag) {
                let responseData = await performAsynScanOperation(scanParams)
                let batchItems = responseData.Items;
                allItems = allItems.concat(batchItems);
                if (responseData.LastEvaluatedKey) {
                    LastEvaluatedKeyFlag = true;
                    console.log('LastEvaluatedKey', responseData.LastEvaluatedKey)
                    scanParams.ExclusiveStartKey = responseData.LastEvaluatedKey
                } else {
                    LastEvaluatedKeyFlag = false;
                }
            }
            return allItems;
        }
        getAllRecords('<Name of table>').then((allItems)=>{
          console.log(allItems)
        })
        

        【讨论】:

          【解决方案10】:

          您可以使用来自@aws/dynamodb-query-iterator 的 ScanPaginator:

          import { ScanPaginator } from '@aws/dynamodb-query-iterator';
          import DynamoDB = require('aws-sdk/clients/dynamodb');
          
          const paginator = new ScanPaginator(
            new DynamoDB.DocumentClient(),
            {
              TableName: "users",
              FilterExpression: "#user_status = :user_status_val",
              ExpressionAttributeNames: {
                "#user_status": "user_status",
              },
              ExpressionAttributeValues: { ":user_status_val": 'somestatus' }
            }
          );
          
          for await (const page of paginator) {
              // do something with `page`, e.g. myFunction(page.Items)
          }
          

          【讨论】:

            【解决方案11】:

            这是扫描所有记录的插入式替换:

            const scanAll = async (params) => {
                let all = [];
                while (true) {
                    let data = await new Promise((resolve, reject) => {
                        db.scan(params, function (err, data) {
                            if (err)
                                reject(err);
                            else
                                resolve(data);
                        });
                    });
                    all = all.concat(data.Items);
                    if (data.LastEvaluatedKey)
                        params.ExclusiveStartKey = data.LastEvaluatedKey;
                    else
                        break;
                }
                return all;
            };
            

            用法:

            scanAll(query)
                .catch((err) => {
            
                })
                .then((records) => {
            
                });
            }
            

            【讨论】:

              【解决方案12】:

              scan 方法读取表中的每一项,并返回表中的所有数据。您可以提供一个可选的 filter_expression,以便只返回符合您的条件的项目。但是,仅在扫描整个表后才应用过滤器。 ref

              我正在分享重构的 onScan 函数,希望对您有所帮助。

              var AWS = require("aws-sdk");
              
              AWS.config.update({
                  region: "us-west-2",
                  endpoint: "http://localhost:8000"
              });
              
              var docClient = new AWS.DynamoDB.DocumentClient();
              
              async function read() {
                      const params = {
                          TableName: "tableName"
                          // options can be passed here e.g.
                          // FilterExpression: "#yr between :start_yr and :end_yr",
                      };
              
                      let items = [];
                      return new Promise((resolve, reject) => {
                          function onScan(err, data) {
                              if (err) {
                                  console.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
                                  reject();
                              } else {
                                  items = items.concat(data.Items);
              
                                  // continue scanning if we have more items, because
                                  // scan can retrieve a maximum of 1MB of data
                                  if (typeof data.LastEvaluatedKey !== "undefined") {
                                      params.ExclusiveStartKey = data.LastEvaluatedKey;
                                      docClient.scan(params, onScan);
                                  } else {
                                      resolve(items);
                                  }
                              }
                          }
                          docClient.scan(params, onScan);
                      });
                  }
              

              【讨论】:

                猜你喜欢
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 2016-10-15
                • 1970-01-01
                • 2018-04-06
                • 1970-01-01
                相关资源
                最近更新 更多