【问题标题】:RxJava Observable to smooth out bursts of eventsRxJava Observable 用于平滑突发事件
【发布时间】:2016-04-10 06:32:42
【问题描述】:

我正在编写一个流式 Twitter 客户端,它只是将流发送到电视上。我正在使用 RxJava 观察流。

当流突然出现时,我想对其进行缓冲并减慢速度,以便每条推文至少显示 6 秒。然后在安静的时间里,任何已经建立的缓冲区都会通过拉队列头逐渐清空自己,每 6 秒发一条推文。如果有一条新推文进入并面临一个空队列(但在显示最后一条之后 > 6 秒),我希望它立即显示。

我想象流看起来像 here 描述的那样:

Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

而且我知道那里提出的问题有解决方案。但我就是无法理解它的答案。这是我的解决方案:

myObservable
    .concatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long l) {
            return Observable.concat(
                Observable.just(l),
                Observable.<Long>empty().delay(6, TimeUnit.SECONDS)
            );
        }
    })
    .subscribe(...);

所以,我的问题是:这是不是太天真了?缓冲/背压发生在哪里?有没有更好的解决方案?

【问题讨论】:

    标签: twitter rx-java


    【解决方案1】:

    如果消息相对于上一条消息来得太早,您似乎想延迟消息。您必须跟踪最后一个目标排放时间并在此之后安排新的排放:

    public class SpanOutV2 {
        public static void main(String[] args) {
            Observable<Integer> source = Observable.just(0, 5, 13)
                    .concatMapEager(v -> Observable.just(v).delay(v, TimeUnit.SECONDS));
    
            long minSpan = 6;
            TimeUnit unit = TimeUnit.SECONDS;
            Scheduler scheduler = Schedulers.computation();
    
            long minSpanMillis = TimeUnit.MILLISECONDS.convert(minSpan, unit);
    
            Observable.defer(() -> {
                AtomicLong lastEmission = new AtomicLong();
    
                return source
                .concatMapEager(v -> {
                    long now = scheduler.now();
                    long emission = lastEmission.get();
    
                    if (emission + minSpanMillis > now) {
                        lastEmission.set(emission + minSpanMillis);
                        return Observable.just(v).delay(emission + minSpanMillis - now, TimeUnit.MILLISECONDS);
                    }
                    lastEmission.set(now);
                    return Observable.just(v);
                });
            })
            .timeInterval()
            .toBlocking()
            .subscribe(System.out::println);
        }
    }
    

    在这里,源延迟了相对于问题开始的秒数。 0 应该立即到达,5 应该到达 @ T = 6 秒,13 应该到达 @ T = 13。concatMapEager 确保保持顺序和时间。由于只使用标准运算符,因此自然而然地组成了背压和退订。

    【讨论】:

    • 是的!看起来这肯定会解决我的问题。出于好奇,在消息之前延迟(就像你所做的那样)比在消息之后延迟(就像我所做的那样)有优势吗?我认为这两种解决方案都会奏效。也许有一个我不明白的细微 Rx 差异。
    • 更少的操作员,更少的开销。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多