【问题标题】:Batching using RxJS?使用 RxJS 进行批处理?
【发布时间】:2016-12-27 05:41:36
【问题描述】:

我猜这应该很容易实现,但我在弄清楚如何解决它时遇到了麻烦(我猜是概念上的)。

我拥有的是一个返回 JSON 对象数组的 API。我需要逐步检查这些对象,并为每个对象进行另一个 AJAX 调用。问题是处理每个 AJAX 调用的系统一次只能处理两个活动调用(因为它是一个与桌面应用程序挂钩的 CPU 密集型任务)。

我想知道如何使用 RxJS(使用版本 5 或 4)实现这一点?

编辑:此外,是否可以同时运行一系列步骤。即

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

我试过做类似的事情:

Rx.Observable.fromPromise(start())
    .concatMap(arr => Rx.Observable.from(arr))
    .concatMap(x => downloadFile(x))
    .concatMap((entry) => processFile(entry))
    .concatMap((entry) => convertFile(entry))
    .concatMap((entry) => UploadFile(entry))
    .subscribe(
        data => console.log('data', new Date().getTime(), data),
        error => logger.warn('err', error),
        complete => logger.info('complete')
    );

但这似乎不起作用。例如,downloadFile 不会等待 processFile、convertFile 和 uploadFile 全部完成,而是在前一个完成后立即再次运行下一个。

【问题讨论】:

标签: javascript rxjs rxjs5


【解决方案1】:

这样的事情怎么样?您可以使用from 将数组分解成一口大小的块,然后使用concatMap 一个一个地处理它们。

function getArr() {
    return Rx.Observable.of([1, 2, 3, 4, 5, 6, 7, 8]);
}


function processElement(element) {
    return Rx.Observable.of(element)
        .delay(500);
}


getArr()
    .concatMap(arr => {
        return Rx.Observable.from(arr);
    })
    .concatMap(element => {
        return processElement(element);
    })
    .subscribe(res => {
        console.log(res);
    });

【讨论】:

  • 这看起来很有希望,但是可以扩展它吗?即,如果我还想添加另一个步骤?
  • 我相信你可以扩展它,是的。您可以继续添加 concatMaps 或您需要的任何其他运算符。这里有很多好资料reactivex.io/rxjs/class/es6/Observable.js~Observable.html
【解决方案2】:

您可以将merge 运算符与maxConcurrency 重载(Rxjs v4)一起使用,例如:

Rx.Observable.fromArray(aJSONs)
  .map (function (JSONObject) {
    return ajaxRequest(JSONObject) // that would be an observable (or a promise)
  })
  .merge(2)

您可以在以下位置查看其他使用示例:

官方文档:

【讨论】:

    【解决方案3】:

    如果您想要完全像这样的请求序列,这里有 2 种方法

    Downloading File: 1
    Processing File: 1
    Converting File: 1
    Uploading File: 1
    Downloading File: 2
    Processing File: 2
    ...
    

    您需要解决单个 concatMap 方法中的所有承诺,像这样

    Rx.Observable.fromPromise(getJSONOfAjaxRequests())
      .flatMap(function(x) { return x;})
      .concatMap(function(item) {
        return downloadFile(item)
          .then(processFile)
          .then(convertFile);
      })
      .subscribe(function(data) {
        console.log(data);
      });
    

    在这里查看工作 plunkr:https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview 这样,新的 ajax 调用只会在前一个调用完成时发送。

    另一种方法是允许文件并行发送请求,但“下载、处理、转换、上传”操作将按顺序进行。为此,您可以通过

    Rx.Observable.fromPromise(getJSONOfAjaxRequests())
      .flatMap(function(x) { return x;})
      .merge(2)  // in case maximum concurrency required is 2
      .concatMap(function(item) {
        return downloadFile(item);
      })
      .concatMap(function(item) {
        return processFile(item);
      })
      .concatMap(function(item) {
        return convertFile(item)
      })
      .subscribe(function(data) {
        //console.log(data);
      });
    

    在此处查看 plunkr:https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

    【讨论】:

    • 谢谢,第一个是我已经使用的,虽然第二个是理想的,如果我可以限制并行操作的数量(即一次最多 2 个)。
    • 顺便说一句,第一个解决方案似乎只适用于 rxjs5。使用版本 4 的行为并行运行它们。
    • 奇怪!理论上,rxjs4 和 rxjs5 的行为应该保持不变。
    【解决方案4】:

    旧帖子,但我相信这可以工作,对于控制台日志,我们可以使用 tap。注释编辑器会通过智能感知错误,因为 from 需要一个数组,但代码应该可以工作。

     from(start()).pipe(
        switchMap(files => from(files).pipe(
           switchMap(file => from(downloadFile(file)).pipe(
             map(_ => ({file: file, downloaded: true}))
           )),
           switchMap(attr => from(processFile(attr.file)).pipe(
             map(_ => ({...attr, downloaded: true}))
           )),
           switchMap(attr => from(convertFile(attr.file)).pipe(
             map(_ => ({...attr, converted: true}))
           )),
           switchMap(attr => from(uploadFile(attr.file)).pipe(
             map(_ => ({...attr, uploaded: true}))
           ))
        ))
     ).subscribe(_ => {})
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-12-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-26
      • 2015-11-21
      • 2017-01-12
      相关资源
      最近更新 更多