【发布时间】:2018-05-03 08:01:05
【问题描述】:
我们已经用 Javascript 实现了一个队列系统。
消费者记录项目并在内部将其插入队列(这是存储在会话存储中的数组)。
时间间隔用于出列一些项目并将这些项目发送到后端。
我们如何使用 Rxjs 和流来实现这一点?
【问题讨论】:
我们已经用 Javascript 实现了一个队列系统。
消费者记录项目并在内部将其插入队列(这是存储在会话存储中的数组)。
时间间隔用于出列一些项目并将这些项目发送到后端。
我们如何使用 Rxjs 和流来实现这一点?
【问题讨论】:
const queue = new Rx.Subject();
const queueProcessing = queue
.mergeMap(i => Rx.Observable.of(i)
.do(val => console.log('processing item: ' + val))
.delay(2000) /* stub processing time*/
, 2) /* concurrency */
.subscribe();
queue.next('asdf');
queue.next('as');
queue.next('zxvc');
queue.next('`1`');
queue.next('zx');
queue.next('234');
queue.next('5');
queue.next('645');
queue.next('asdf');
queue.next('3');
queue.next('2');
queue.next('34');
queue.next('asdf');
queue.next('5');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.1/Rx.js"></script>
.mergeMap 包含一个演示“处理”函数,它只是在延迟后发出值。每次您想将项目添加到队列中时,您 .next() 将其添加到主题中。
【讨论】:
n
你要找的接线员是bufferTime
import { Subject } from 'rxjs/Subject';
import { bufferTime } from 'rxjs/operators';
const queue$ = new Subject();
const interval = 2000; // 2 seconds
queue$.pipe(bufferTime(interval))
.subscribe(queueArray => {
// Send to backend
console.log(queueArray);
});
queue$.next('hello');
queue$.next('world');
// After 2 seconds logs: ['hello', 'world']
【讨论】: