【问题标题】:How many requests can Node-Express fire off at once?Node-Express 一次可以触发多少个请求?
【发布时间】:2018-01-11 18:04:49
【问题描述】:

我有一个脚本,它从 AWS Athena 中提取 25,000 条记录,这基本上是一个 PrestoDB 关系 SQL 数据库。假设我为这些记录中的每一个生成一个请求,这意味着我必须向 Athena 发出 25,000 个请求,然后当数据返回时,我必须向我的 Redis 集群发出 25,000 个请求。

从节点到 Athena 的理想请求数量是多少?

我问的原因是因为我试图通过创建一个包含 25,000 个承诺的数组然后在其上调用 Promise.all(promiseArray) 来做到这一点,但应用程序只是永远挂起。

所以我决定一次触发 1 个并使用递归将第一个索引拼接出来,然后在 promise 解决后将剩余的记录传递给调用函数。

问题在于它需要很长时间。我休息了大约一个小时,回来后发现还剩下 23,000 条记录。

我试图用谷歌搜索 Node 和 Athena 一次可以处理多少个请求,但我什么也没找到。我希望有人可能对此有所了解并能够与我分享。

谢谢。

这是我的代码仅供参考:

作为旁注,我想做的不同之处在于,我可以一次发送 4、5、6、7 或 8 个请求,而不是一次发送一个请求,具体取决于它的执行速度。

另外,Node 集群如何影响这样的性能?

exports.storeDomainTrends = () => {
return new Promise((resolve, reject)=>{
    athenaClient.execute(`SELECT DISTINCT the_column from "the_db"."the_table"`,
    (err, data) =>  {
        var getAndStoreDomainData = (records) => {
            if(records.length){
                return new promise((resolve, reject) => {
                    var subrecords = records.splice(0, )[0]
                    athenaClient.execute(`
                    SELECT 
                    field,
                    field,
                    field,
                    SUM(field) as field
                    FROM "the_db"."the_table"
                    WHERE the_field IN ('Month') AND the_field = '`+ record.domain_name +`'
                    GROUP BY the_field, the_field, the_field
                    `, (err, domainTrend) => {

                        if(err) {
                            console.log(err)
                            reject(err)
                        }

                        redisClient.set(('Some String' + domainTrend[0].domain_name), JSON.stringify(domainTrend))
                        resolve(domainTrend);
                    })
                })
                .then(res => {
                    getAndStoreDomainData(records);
                })
            }
        }

        getAndStoreDomainData(data);

    })
})

}

【问题讨论】:

  • 看看stackoverflow.com/questions/47967232/…,您可能想要分块您的请求并一次处理一个块。块大小取决于您的系统,因此我们无法为您估计,您可能需要自己测试。
  • 这取决于很多因素。有些是你无法控制的。例如,AWS Athena 有请求限制。 docs.aws.amazon.com/athena/latest/ug/service-limits.html
  • @Kevin,谢谢,我认为这解决了我的问题,即我的请求为何停滞不前。如果您想提交它作为您的答案,我很乐意接受。
  • 您可以以 1000 个为一组使用 Promise.all,并限制每个时间段发出的请求(例如每秒最多 20 个)或限制活动请求(最多 100 个)。限制不仅是您的硬件处理的内容和硬件上的软件允许的内容,还包括您的请求的接收者可以处理或允许的内容。 Here 是一个限制承诺的例子。

标签: javascript node.js express node-redis amazon-athena


【解决方案1】:

使用lib,您的代码可能如下所示:

const Fail = function(reason){this.reason=reason;};
const isFail = x=>(x&&x.constructor)===Fail;
const distinctDomains = () =>
  new Promise(
    (resolve,reject)=>
      athenaClient.execute(
        `SELECT DISTINCT domain_name from "endpoint_dm"."bd_mb3_global_endpoints"`,
        (err,data)=>
          (err)
            ? reject(err)
            : resolve(data)
      )
  );
const domainDetails = domain_name =>
  new Promise(
    (resolve,reject)=>
      athenaClient.execute(
        `SELECT 
        timeframe_end_date,
        agg_type,
        domain_name,
        SUM(endpoint_count) as endpoint_count
        FROM "endpoint_dm"."bd_mb3_global_endpoints"
        WHERE agg_type IN ('Month') AND domain_name = '${domain_name}'
        GROUP BY timeframe_end_date, agg_type, domain_name`,
        (err, domainTrend) =>
            (err)
              ? reject(err)
              : resolve(domainTrend)
        )
  );
const redisSet = keyValue =>
  new Promise(
    (resolve,reject)=>
      redisClient.set(
        keyValue,
        (err,res)=>
          (err)
            ? reject(err)
            : resolve(res)
      )
  );
const process = batchSize => limitFn => resolveValue => domains => 
  Promise.all(
    domains.slice(0,batchSize)
    .map(//map domains to promises
      domain=>
        //maximum 5 active connections
        limitFn(domainName=>domainDetails(domainName))(domain.domain_name)
        .then(
          domainTrend=>
            //the redis client documentation makes no sense whatsoever
            //https://redis.io/commands/set
            //no mention of a callback
            //https://github.com/NodeRedis/node_redis
            //mentions a callback, since we need the return value
            //and best to do it async we will use callback to promise
            redisSet([
              `Endpoint Profiles - Checkin Trend by Domain - Monthly - ${domainTrend[0].domain_name}`,
              JSON.stringify(domainTrend)
            ])
        )
        .then(
          redisReply=>{
            //here is where things get unpredictable, set is documented as 
            //  a synchronous function returning "OK" or a function that
            //  takes a callback but no mention of what that callback recieves
            //  as response, you should try with one or two records to
            //  finish this on reverse engineering because documentation
            //  fails 100% here and can not be relied uppon.
            console.log("bad documentation of redis client... reply is:",redisReply);
            (redisReply==="OK")
              ? domain
              : Promise.reject(`Redis reply not OK:${redisReply}`)
          }
        )
        .catch(//catch failed, save error and domain of failed item
          e=>
            new Fail([e,domain])
        )
    )
  ).then(
    results=>{
      console.log(`got ${batchSize} results`);
      const left = domains.slice(batchSize);
      if(left.length===0){//nothing left
        return resolveValue.conat(results);
      }
      //recursively call process untill done
      return process(batchSize)(limitFn)(resolveValue.concat(results))(left)
    }
  );
const max5 = lib.throttle(5);//max 5 active connections to athena
distinctDomains()//you may want to limit the results to 50 for testing
//you may want to limit batch size to 10 for testing
.then(process(1000)(max5)([]))//we have 25000 domains here
.then(
  results=>{//have 25000 results
    const successes = results.filter(x=>!isFail(x));
    //array of failed items, a failed item has a .reason property
    //  that is an array of 2 items: [the error, domain]
    const failed = results.filter(isFail);
  }
)

你应该弄清楚redis客户端是做什么的,我尝试使用文档来弄清楚,但不妨问问我的金鱼。一旦您对客户端行为进行了逆向工程,最好尝试使用小批量来查看是否有任何错误。需要导入lib才能使用,可以找到here

【讨论】:

  • 我要投赞成票,因为看起来你花了很多时间在它上面,这很有趣:)
【解决方案2】:

我能够按照 Kevin B 所说的找到一种更快的方法来查询数据。我所做的是更改查询,以便我可以从 Athena 获得所有域的趋势。我按 domain_name 对其进行排序,然后将其作为 Node 流发送,以便在数据进入时将每个域名分离成它自己的 JSON。

不管怎样,这就是我最终的结果。

exports.storeDomainTrends = () => {
return new Promise((resolve, reject)=>{
    var streamObj = athenaClient.execute(`
    SELECT field,
            field,
            field,
            SUM(field) AS field
    FROM "db"."table"
    WHERE field IN ('Month')
    GROUP BY  field, field, field
    ORDER BY  field desc`).toStream();

    var data = [];

    streamObj.on('data', (record)=>{
        if (!data.length || record.field === data[0].field){
            data.push(record)
        } else if (data[0].field !== record.field){
            redisClient.set(('Key'), JSON.stringify(data))
            data = [record]
        }
    })

    streamObj.on('end', resolve);

    streamObj.on('error', reject);

})
.then()

}

【讨论】:

  • 如果每个域有多个timeframe_end_date,则您的分组有问题。您总是保存前一个域,因此最后一个域不会保存在 redis 中。您可以将最后一个保存在 end 处理程序中。
  • 感谢您指出这一点!我会相应地更新!
猜你喜欢
  • 2012-03-18
  • 2023-03-24
  • 2014-11-10
  • 1970-01-01
  • 1970-01-01
  • 2016-07-01
  • 2021-08-04
  • 1970-01-01
  • 2014-12-29
相关资源
最近更新 更多