【问题标题】:RxJS emit array items over time?RxJS 随着时间的推移发出数组项?
【发布时间】:2021-08-06 17:29:14
【问题描述】:

我正在尝试以 500 毫秒的间隔一个接一个地发出简单的数组值:

var a = Rx.Observable.from([1,2,3]);
a.interval(500).subscribe(function(b) { console.log(b); });

但是,这会引发异常:

Uncaught TypeError: a.interval is not a function.

【问题讨论】:

标签: rxjs


【解决方案1】:

使用 RxJS 版本 6 的三种方法:

1。使用concatMap

import { from, of, pipe } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

from(array)
 .pipe(
   concatMap(val => of(val).pipe(delay(1000))),
 )
 .subscribe(console.log);

2。使用zipinterval

import { from, pipe, interval } from 'rxjs';
import { delay, zip} from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

from(array)
 .pipe(
   zip(interval(1000), (a, b) => a),
 )
 .subscribe(console.log);

3。使用interval 作为源

import { interval, pipe } from 'rxjs';
import { map, take } from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

interval(1000)
.pipe(
  take(array.length),
  map(i => array[i])
)
.subscribe(console.log);

【讨论】:

  • 当心! zip 运算符已被弃用,取而代之的是静态 zip!来源:rxjs-dev.firebaseapp.com/api/operators/zip 所以变成了:import { zip } from 'rxjs'; zip(interval(1000), from(array)).map((a, b) => a).subscribe(...)
【解决方案2】:

正如 xgrommx 已经指出的,interval 不是 observable 的实例成员,而是Rx.Observable 的静态成员。

Rx.Observable.fromArray([1,2,3]).zip(
  Rx.Observable.interval(500), function(a, b) { return a; })
.subscribe(
  function(x) { document.write(x + '<br \>'); },  
  null,  
  function() { document.write("complete"); });
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"&gt;&lt;/script&gt;

【讨论】:

    【解决方案3】:

    我会这样做:

    var fruits = ['apple', 'orange', 'banana', 'apple'];
    var observable = Rx.Observable.interval(1000).take(fruits.length).map(t => fruits[t]);
    observable.subscribe(t => {
      console.log(t);
      document.body.appendChild(document.createTextNode(t + ', '));
    });
    &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"&gt;&lt;/script&gt;

    【讨论】:

    • 很有趣,但这不能满足希望以固定时间间隔输出来自事件流的项目的一般情况(根据示例,OP 似乎想要)
    【解决方案4】:
    var arrayList = [1,2,3,4,5];
    
    var source = Rx.Observable
          .interval(500/* ms */)
          .timeInterval()
          .take(arrayList.length);
    
    source.subscribe(function(idx){
        console.log(arrayList[idx]);
       //or document.write or whatever needed
    });
    

    【讨论】:

    • 很有趣,但这不能满足希望以固定时间间隔输出来自一个事件流的项目的一般情况
    【解决方案5】:

    很晚了,但更简单的解决方案是:

    const arr = ["Hi,", "how", "may", "I", "help", "you?"];
    
    Rx.Observable.interval(500)
                 .takeWhile(_ => _ < arr.length)
                 .map(_ => arr[_])
                 .subscribe(_ => console.log(_))    
    

    【讨论】:

      【解决方案6】:

      我发现 Weichhold 技术是最好的,但它可以通过在 zip 之外提取压缩值来获得清晰的意图:

      // assume some input stream of values:
      var inputs = Obs.of(1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8);
      // emit each value from stream at a given interval:
      var events = Obs.zip(inputs, Obs.interval(1000))
          .map(val => val[0])
          .forEach(console.log);
      

      【讨论】:

        【解决方案7】:

        如果你想随着时间的推移发布样本,你可以这样做

        const observable = interval(100).pipe(
          scan((acc, value) => [value, ...acc], []),
          sampleTime(10000),
          map((acc) => acc[0])
        );
        

        【讨论】:

          【解决方案8】:

          我有一点不同的要求,我的阵列也会随着时间的推移而不断更新。所以基本上我必须实现一个可以定期出列的队列,但我不想使用间隔。

          如果有人需要这样的东西,那么这个解决方案可能会有所帮助:

          我有一个函数createQueue(),它将数组作为输入并返回一个 Observable,我们订阅它以定期侦听来自数组的事件。 该函数还修改了 pass 数组的 'push()' 方法,以便每当有任何项目被推入数组时,Observable 都会发出。

          createQueue(queue: string[]) {
            return Observable.create((obs: Observer<void>) => {
              const arrayPush = queue.push;
              queue.push = (data: string) => {
                const returnVal = arrayPush.call(queue, data);
                obs.next();
                return returnVal;
              }
            }).pipe(switchMap(() => {
              return from([...queue])
                .pipe(
                  concatMap(val => of(val)
                  .pipe(delay(1000)))
                );
            }), tap(_ => queue.shift()))
          }
          

          假设数组是:taskQueue = [];

          所以,我们需要将它传递给上面的函数并订阅它。

            createQueue(taskQueue).subscribe((data) => {
              console.log('Data from queue => ', data);
            });
          

          现在,每次我们taskQueue.push('&lt;something here&gt;'),订阅都会在延迟“1000ms”后触发。

          请注意:在调用createQueue() 之后,我们不应将新数组分配给taskQueue,否则我们将丢失修改后的push()

          这是上述实现的一个虚拟示例:Test Example

          【讨论】:

            【解决方案9】:

            Rx.Observable 实例没有interval 方法http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/index.html。你可以这样使用。

            Rx.Observable.interval(500)
                         .map(function(v) { return [1,2,3];})
                         .subscribe(console.log.bind(console));
            

            【讨论】:

            • 该序列不会产生您期望的结果。
            • 你需要有return [1,2,3][v] 才能工作
            猜你喜欢
            • 1970-01-01
            • 2020-04-21
            • 1970-01-01
            • 2012-03-09
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多