【问题标题】:Batch and delay API calls with nodejs/async使用 nodejs/async 批量和延迟 API 调用
【发布时间】:2015-01-03 00:17:28
【问题描述】:

我正在研究一个社交网络图,我想根据从 API 获得的邻接列表构建一个“六度分离”树。

对于每个人,API 将以 [id1, id2, id3...] 的形式返回一组朋友,这正是我想要的。但问题是人数众多,API 仅允许 400 次调用/15 分钟。我可以将数据保存在本地数据库中,但我不想让 API 泛滥成灾。

我正在做的伪代码是这样的:

requestCharacter = function(id) {
    is this person in my db already? if true, return;
    else make api call(error, function(){loopFriends(character)}) {
       save character in database
    }
}

loopFriends(character){
   foreach(friend in character.friends) requestCharacter(friend);
}

我已经或多或少地编写了代码,它工作正常,但由于它不断遍历树,并且由于人们在彼此的朋友列表中重复出现,它的效率非常低,并且不断破坏 API 限制

因此,我想做的是将请求排队,在添加之前检查队列中是否还没有某些内容,然后一次以 400 个或更少的请求批量运行队列。 (因此,如果队列中有 1200 个,它将运行 400,等待 15 分钟,运行 400,等待 15 分钟,运行 400...)

我尝试将 async.js 与它的队列一起使用,并且能够将大量内容加载到队列中,但我认为它实际上并没有运行过。对于这种情况,最好的方法是什么?

我的实际非排队代码如下:

var lookupAndInsertCharacter = function(id){
  Character.findOne({ 'id': id }, function (err, person) {
    if (err) console.log(err);
    else {
      if(person!=null) {console.log('%s already exists in database, not saved', person.name); getCharacterFriends(id);}
      else insertCharacter(id, function(){getCharacterFriends(id)});
    };
  })
}

var insertCharacter = function(id, callback){
    var url = getCharacterURL(id);
    request(url, function (error, response, body) {
    if (!error && response.statusCode == 200) {
      var result = JSON.parse(body);
      if(result.status_code != 1 ) {console.log("ERROR status_code: %s. Please wait 15 minutes", result.status_code); return;}
      else {
        var me = new Character(processCharacter(result));
        me.save(function(err){
          if (err) return handleError(err);
        });
        console.log("Saved character "+me.name);
      }  
    } 
    else {
      console.log(error);
    }
  }); 
}

var getCharacterFriends = function(id) {
  Character.findOne({ 'id': id }, function (err, person) {
    if (err) console.log(err);
    else {
      console.log("Getting friends for %s",person.name);
      _.each(person.character_friends, function(d){
        lookupAndInsertCharacter(d);
      });
      console.log("Getting enemies for %s",person.name);
      _.each(person.character_enemies, function(d){
        lookupAndInsertCharacter(d);
      })
    };
  })
}

【问题讨论】:

    标签: javascript node.js mean-stack


    【解决方案1】:

    最终对我有用的是限制 API 调用的速率。我用过

    https://github.com/wankdanker/node-function-rate-limit

    然后我做了一个限定版的insertCharacter:

    var rateLimit = require('function-rate-limit');
    
    var insertLimited = rateLimit(400, 900000, function (id) {
      insertCharacter(id);
    });
    

    【讨论】:

      【解决方案2】:

      在下面的示例中,我在 FaceBook 上获取了我的所有群组、其中的帖子以及其作者的公开个人资料。

      为了减慢这个过程,我创建了一个有限的“scrapers”池并将每个scraper 保留一段时间,所以我“不能超载 FaceBook 服务器:)”

      对于上面的例子,你可以

      • 将您的池大小限制为 400 max : 400 并保留您的爬虫 15 分钟setTimeout(function(){pool.release(scraper);}, 15*60*1000);
      • 或将您的池大小限制为 1 max : 1 并保留您的爬虫 3.75 秒 setTimeout(function(){pool.release(scraper);}, 3750);

      代码来了

      function saveData (anyJson) {
          // put your Db communication here.
          // console.log(anyJson);
      }
      
      function now() {
          instant = new Date();
          return instant.getHours() +':'+ instant.getMinutes() +':'+ instant.getSeconds() +'.'+ instant.getMilliseconds();
      }
      var graph = require('fbgraph');
      console.log(process.argv[2]);
      graph.setAccessToken(process.argv[2]);
      
      var poolModule = require('generic-pool');
      var pool = poolModule.Pool({
          name     : 'scraper',
          create   : function(callback) {
              console.log(now() +' created scraper');
              // parameter order: err, resource
              callback(null, {created:now()});
          },
          destroy  : function(scraper) { 
              console.log(now() +' released scraper created '+ scraper.created); 
          },
          max      : 10,
          min      : 1, 
          idleTimeoutMillis : 60*60*1000,
          log : false
      });
      
      function pooledGraphGet(path,analyse) {
          pool.acquire(function(err,scraper) {
              if (err) {
                  console.log(now() +' Could not get a scraper for '+ path);
                  throw err;
              }
              graph.get(path,function(err,res) {
                  if (err) {
                      console.log(now() +' Could not get '+ path +' using scraper created '+ scraper.created);
                      throw err;
                  } else {
                      console.log(now() +' Got '+ path +' using scraper created '+ scraper.created);
                      setTimeout(function(){pool.release(scraper);}, 60*1000);
                      analyse(res);
                  }
              });
          });
      }
      
      pooledGraphGet('me?fields=friends,groups', function(res) {
          res.groups.data.forEach(function(group) {
              saveData (group);
              pooledGraphGet(group.id +'?fields=id,name,members,feed', function(res) {
                  if (res.feed) res.feed.data.forEach(function(feed){
                      saveData (feed);
                      pooledGraphGet(feed.from.id +'?fields=id,name', function(res) {
                          saveData (res);
                      });
                  });
              });
          });
      });
      

      【讨论】:

      • 请原谅我对节点的完全菜鸟...如果我要使用通用池的想法,我会将 API 调用函数作为“客户端”放在池中吗?是否通过将 idleTimeout 设置为一分钟来完成“释放前等待一分钟”?
      猜你喜欢
      • 2015-08-21
      • 2021-02-23
      • 1970-01-01
      • 2021-10-01
      • 2013-02-14
      • 2019-02-17
      • 1970-01-01
      • 2021-10-18
      • 2011-06-19
      相关资源
      最近更新 更多