【问题标题】:Lambda - Import CSV from S3 to RDS MySQLLambda - 将 CSV 从 S3 导入 RDS MySQL
【发布时间】:2020-05-11 00:03:27
【问题描述】:

我有一个 Lambda 函数,可以将特定的 CSV 文件从 S3 导入 MySQL。但是,CSV 的文件大小约为 1 GB。当我运行此代码时,它不会处理并超时。

//s3 to rds
const fs = require("fs");
const AWS = require('aws-sdk');
var mysql = require('mysql');
var config = require('./config.json');
const s3 = new AWS.S3({
  accessKeyId: 'XXXXXXXXXXXXXXX',
  secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXxx'
});
var filePath = `localfilepath`;

var pool = mysql.createPool({
  host: config.dbhost,
  user: config.dbuser,
  password: config.dbpassword,
  database: config.dbname
});
pool.getConnection((err, connection) => {
  if (err) throw err;
  console.log("Connected!" + connection);

  var s3Params = {
    Bucket: '<your_bucket_name>',
    Key: '<your_key>'
  };
  s3.getObject(s3Params, function(err, result) {
    if (err) {
      throw new Error(err);
    } else {
      console.log('file stored successfully', result);
      fs.createWriteStream(filePath).write(result.Body);
      connection.query('TRUNCATE TABLE <table_name>', (err, result) => {
        if (err) {
         throw new Error(err);
        } else {
          console.log('table truncated');
          var query = `LOAD DATA LOCAL INFILE '<file_name>' INTO table <table_name> FIELDS TERMINATED BY ','  ENCLOSED BY '"' IGNORE 1 LINES `;
          connection.query(query, function(err, result) {
            if (err) throw err;
            console.log("Result: " + result);
            connection.release();
            fs.unlinkSync(filePath);
            console.log('file deleted');
          });
        }
      });
    }

  });
})

我怎样才能使它工作?

【问题讨论】:

  • 1.我没有看到您实际上将文件保存在任何地方。它似乎正在将其加载到内存中。 2. 在 Lambda 运行时环境中可用于保存文件的总空间为半 Gig,因此您的文件太大而无法与 AWS Lambda 一起使用。
  • 执行此操作的选项有哪些?
  • 在上传到 S3 之前将文件拆分为更小的文件,或者使用 ECS、EKS 或 EC2 来运行导入而不是 Lambda。
  • 是否可以使用 Lambda 拆分文件?恐怕我不能使用 EC2 或任何其他服务。只是 Lambda。
  • 您可以尝试流式传输对象并一次写入一部分。请参阅此处的“检索对象的字节范围”示例:docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/…

标签: node.js amazon-web-services aws-lambda amazon-rds


【解决方案1】:

您基本上有 2 个障碍需要克服:1) Lambda 上的本地存储只有 512mb 和 2) Lambda 的执行时间限制为 15 分钟(您必须在函数上明确配置)

要解决问题1,您可以使用S3 Select。它允许您对 S3 中的对象(CSV 和 JSON 文件)执行 SQL 查询。对您的 CSV 文件执行 S3 选择查询,对于您检索的每条记录,您都可以将其插入队列并让其他工作人员将它们插入数据库。您也可以直接插入您的 RDS,但它可能会更慢。

这是一个代码示例:

const AWS = require('aws-sdk');
var fs = require('fs');

const S3 = new AWS.S3();

exports.handler = async (event, context) => {
    try {
        const query = "SELECT * FROM s3object s WHERE s.id > '0'";
        const bucket = 'my-bucket';
        const key = 'data.csv';

        const params = {
            Bucket: bucket,
            Key: key,
            ExpressionType: 'SQL',
            Expression: query,
            InputSerialization: { CSV: { FileHeaderInfo: 'USE' } },
            OutputSerialization: { CSV: {} }
        }

        const data = await getDataUsingS3Select(params);
        context.succeed(data);
    } catch (error) {
        context.fail(error);
    }
};

const getDataUsingS3Select = async (params) => {
    return new Promise((resolve, reject) => {
        S3.selectObjectContent(params, (err, data) => {
            if (err) { reject(err); }

            // This is a stream of events
            data.Payload.on('data', (event) => {
                // event, there is data inside it
                if (event.Records) {
                    // do what you want with payload: send to a queue or direct to db
                    console.log('Row:', event.Records.Payload.toString('utf8'));
                }
            }).on('end', () => {
                // we arrive here after processing everything
                resolve();
            });
        });
    })
}

如果您仍然超过 15 分钟的时间限制,那就是问题 2。首先在 SQL 中添加一个limit 子句。然后您可以在 Lambda 的/tmp 目录中创建一个“检查点”文件。您可以保存您在那里处理的最后一条记录的id,这样当您重新运行您的 Lambda 函数时,它可以读取该文件,获取id 并在查询的where 子句中使用它,例如:

select * from s3object s where s.id > '99' limit 50000

【讨论】:

  • 限时15分钟,你有可以使用的代码吗?
【解决方案2】:

如果您的主要目标是将数据从 S3 上的 CSV 文件导入 RDS MySQL,请查看AWS Data Pipeline。它已经在Load S3 Data into Amazon RDS MySQL Table 中拥有此常见任务所需的所有已定义资源,但它使用 EC2 实例。但同时它更容易扩展和维护解决方案。

【讨论】:

    【解决方案3】:

    根据this 线程,他们确实希望在某个时候实现,但是什么时候是最好的猜测方案。

    AWS Lambda 当前在 /tmp 目录中有 512mb 磁盘空间的“硬限制”(如 here 所述),因此由于文件大小为 1GB,fs.createWriteStream(filePath).write(result.Body); 行不应该在这里工作。该错误类似于"no space left on device"(来自审查现有线程)。

    但是,在这种情况下,从 S3 加载文件应该可以工作。 Lambda 会按比例缩放内存和 CPU 大小,因此它可能会由于这里的内存不足而超时(取决于您设置的内容)。 This link 很好地指示了您需要为此设置什么(与您加载到内存和磁盘空间的内容有关)。

    我建议在此阶段将流拆分为 512mb 块(this 包可能会有所帮助)并将它们分别存储在 S3 中,这样您就可以将此操作拆分为 2 个函数:

    1. 获取数据并拆分为单独的 s3 文件(同时截断您的表)。
    2. 将 CSV 数据从 S3 加载回您的 RDS

    (您可以为此使用Cloudwatch Events

    【讨论】:

    • 你有我可以参考的代码示例吗?
    猜你喜欢
    • 2022-08-16
    • 2022-01-14
    • 1970-01-01
    • 2017-11-14
    • 2019-04-29
    • 1970-01-01
    • 2015-02-07
    • 2011-10-09
    • 2018-10-29
    相关资源
    最近更新 更多