【问题标题】:Is there a way to manage concurrency with RxJS?有没有办法用 RxJS 管理并发?
【发布时间】:2017-03-26 09:41:30
【问题描述】:

TL;DR - 在我使用 RxJS 时,我正在寻找一种方法来控制与 REST API 的 HTTP 请求并发连接数。

我的 Node.js 应用程序将对第三方提供商进行数千次 REST API 调用。但是,我知道如果我一次发出所有这些请求,服务可能会因为 DDoS 攻击而关闭或拒绝我的请求。所以,我想在任何给定时间设置最大并发连接数。我曾经通过 Throat Package 使用 Promises 实现并发控制,但我还没有找到类似的方法来实现。

我尝试使用 merge 和 1 进行并发,正如这篇文章 How to limit the concurrency of flatMap? 中所建议的那样,但所有请求都是一次发送的。

这是我的代码:

var Rx = require('rx'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.fromArray(array).map(httpGet).merge(1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });

【问题讨论】:

标签: node.js rxjs


【解决方案1】:

您可以使用mergeMap 运算符来执行 HTTP 请求并将响应扁平化为组合的 observable。 mergeMap 采用可选的 concurrent 参数,您可以使用该参数指定并发订阅的 observables(即 HTTP 请求)的最大数量:

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1);

请注意,mergeMap 指定为1mergeMap 等效于concatMap

您问题中的代码一次发送所有请求的原因归结为在 map 运算符中调用您的 httpGet 函数。 httpGet 返回一个 Promise 并且 Promise 不是惰性的 - 只要调用 httpGet,就会发送请求。

使用上面的代码,httpGet 只会在并发请求少于指定数量的情况下在mergeMap 实现中被调用。

上面的代码将分别从组合的 observable 发出每个响应。如果您希望将响应组合成一个在所有请求完成后发出的数组,您可以使用toArray 运算符:

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1)
  .toArray();

您还应该查看 Martin 在评论中引用的食谱。

【讨论】:

  • 谢谢。你的回答给了我一些提示,让我知道我的样本需要一些调整。还有 rx != rxjs。 rx 似乎已经过时并且并发不起作用。检查上面的例子。
【解决方案2】:

Rx.Observable.fromPromise 可能对您有用。扩展卡坦特的答案,试试这个,其中concurrent 指定为1

Rx.Observable.from(array)
  .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1)
  .subscribe(x => console.log(x))

对于基于时间的控制,这是我能想到的:

Rx.Observable.from(array)
  .bufferCount(2)
  .zip(Rx.Observable.timer(0, 1000), x => x)
  .mergeMap(x => Rx.Observable.from(x)
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)))
  .subscribe(x => console.log(x))

【讨论】:

    【解决方案3】:

    感谢以上回复。我的问题与使用 rx 而不是 rxjs NPM 模块有关。在我卸载 rx 并安装 rxjs 之后,所有示例都开始按预期使用并发。因此,带有 Promises、Callbacks 和 Native Observables 的 http 并发调用运行良好。

    我将它们发布在这里,以防万一有人遇到类似问题并进行故障排除。

    基于 HTTP 请求回调的示例:

    var Rx = require('rxjs'),
      request = require('request'),
      request_rx = Rx.Observable.bindCallback(request.get);
    
    var array = [
      'https://httpbin.org/ip', 
      'https://httpbin.org/user-agent',
      'https://httpbin.org/delay/3',
      'https://httpbin.org/delay/3',
      'https://httpbin.org/delay/3'
    ];
    
    var source = Rx.Observable.from(array).mergeMap(httpGet, 1);
    
    function httpGet(url) {
      return request_rx(url);
    }
    
    var subscription = source.subscribe(
      function (x, body) {
        console.log('=====', x[1].body, '======');
      },
      function (err) {
        console.log('Error: ' + err);
      },
      function () {
        console.log('Completed');
      });
    

    基于承诺的样本:

    var Rx = require('rxjs'),
      rp = require('request-promise');
    
    var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
      'https://httpbin.org/delay/3',
      'https://httpbin.org/delay/3',
      'https://httpbin.org/delay/3'
    ];
    
    var source = Rx.Observable.from(array).mergeMap(httpGet, 1);
    
    function httpGet(url) {
      return rp.get(url);
    }
    
    var results = [];
    var subscription = source.subscribe(
      function (x) {
        console.log('=====', x, '======');
      },
      function (err) {
        console.log('Error: ' + err);
      },
      function () {
        console.log('Completed');
      });
    

    原生 RxJS 示例:

    var Rx = require('rxjs'),
      superagent = require('superagent'),
      Observable = require('rxjs').Observable;
    
    var array = [
      'https://httpbin.org/ip', 
      'https://httpbin.org/user-agent',
      'https://httpbin.org/delay/10',
      'https://httpbin.org/delay/2',
      'https://httpbin.org/delay/2',
      'https://httpbin.org/delay/1',
    ];
    
    let start = (new Date()).getTime();
    
    var source = Rx.Observable.from(array)
        .mergeMap(httpGet, null, 1)
        .timestamp()
        .map(stamp => [stamp.timestamp - start, stamp.value]);
    
    function httpGet(apiUrl) {
      return Observable.create((observer) => {
        superagent
            .get(apiUrl)
            .end((err, res) => {
                if (err) {
                    return observer.onError(err);
                }
                let data,
                    inspiration;
                data = JSON.parse(res.text);
                inspiration = data;
                observer.next(inspiration);
                observer.complete();
            });
        });
    }
    
    var subscription = source.subscribe(
      function (x) {
        console.log('=====', x, '======');
      });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-10-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-10
      • 1970-01-01
      • 2023-03-18
      相关资源
      最近更新 更多