【问题标题】:Rx Buffer with timeout since first new group element自第一个新组元素以来超时的 Rx 缓冲区
【发布时间】:2015-11-18 09:15:36
【问题描述】:

对 Rx 世界很陌生,我需要实现以下行为:

我需要 observable 来收集值并将它们作为列表发出,一旦我有至少 N 个项目,或者自从发出 组的第一个项目以来经过了 T 时间。

我一遍又一遍地阅读文档,很确定它会使用

buffer(timespan, unit, count[, scheduler])

但问题是这里的时间跨度取决于最后组项目。

如果可能的话,我还需要能够flush(强制发射)当前缓冲区,一些项目需要立即处理。我是否正确地假设对于这种情况我需要第二个 observable,在每个项目之前执行处理并合并两者?

有什么想法吗?

Ps:我是用Java工作的,但我不需要Java代码,一个解释就够了。

谢谢!

【问题讨论】:

    标签: system.reactive rx-java


    【解决方案1】:

    这个问题的缓冲方面可以通过多播技巧来实现,但我发现为它编写一个运算符要容易得多,这样数据和上下文就在同一个可访问的地方:

    public final class OperatorBufferFirst<T> implements Operator<List<T>, T> {
        final Scheduler scheduler;
        final long timeout;
        final TimeUnit unit;
        final int maxSize;
        public OperatorBufferFirst(
                long timeout, TimeUnit unit, 
                Scheduler scheduler, int maxSize) {
            this.timeout = timeout;
            this.unit = unit;
            this.scheduler = scheduler;
            this.maxSize = maxSize;
        }
    
        @Override
        public Subscriber<? super T> call(
                Subscriber<? super List<T>> t) {
            BufferSubscriber<T> parent = new BufferSubscriber<>(
                    new SerializedSubscriber<>(t), 
                    timeout, unit, 
                    scheduler.createWorker(), maxSize);
            t.add(parent);
            return parent;
        }
    
        static final class BufferSubscriber<T> 
        extends Subscriber<T> {
            final Subscriber<? super List<T>> actual;
            final Scheduler.Worker w;
            final long timeout;
            final TimeUnit unit;
            final int maxSize;
            final SerialSubscription timer;
    
            List<T> buffer;
            long index;
    
    
            public BufferSubscriber(
                    Subscriber<? super List<T>> actual, 
                    long timeout, 
                    TimeUnit unit, 
                    Scheduler.Worker w, 
                    int maxSize) {
                this.actual = actual;
                this.timeout = timeout;
                this.unit = unit;
                this.w = w;
                this.maxSize = maxSize;
                this.timer = new SerialSubscription();
                this.buffer = new ArrayList<>();
                this.add(timer);
                this.add(w);
            }
    
            @Override
            public void onNext(T t) {
                List<T> b;
                boolean startTimer = false;
                boolean emit = false;
                long idx;
                synchronized (this) {
                    b = buffer;
                    b.add(t);
                    idx = index;
                    int n = b.size();
                    if (n == 1) {
                        startTimer = true;
                    } else
                    if (n < maxSize) {
                        return;
                    } else {
                        buffer = new ArrayList<>();
                        index = ++idx;
                        emit = true;
                    }
                }
    
                if (startTimer) {
                    final long fidx = idx;
                    timer.set(w.schedule(() -> timeout(fidx), timeout, unit));
                }
                if (emit) {
                    timer.set(Subscriptions.unsubscribed());
                    actual.onNext(b);
                }
            }
    
            @Override
            public void onError(Throwable e) {
                actual.onError(e);
            }
    
            @Override
            public void onCompleted() {
                timer.unsubscribe();
                List<T> b;
                synchronized (this) {
                    b = buffer;
                    buffer = null;
                    index++;
                }
                if (!b.isEmpty()) {
                    actual.onNext(b);
                }
                actual.onCompleted();
            }
    
            public void timeout(long idx) {
                List<T> b;
                synchronized (this) {
                    b = buffer;
                    if (idx != index) {
                        return;
                    }
                    buffer = new ArrayList<>();
                    index = idx + 1;
                }
    
                actual.onNext(b);
            }
        }
    
        public static void main(String[] args) {
            TestScheduler s = Schedulers.test();
    
            PublishSubject<Integer> source = PublishSubject.create();
    
            source.lift(new OperatorBufferFirst<>(1, TimeUnit.SECONDS, s, 3))
            .subscribe(System.out::println, Throwable::printStackTrace, 
                    () -> System.out.println("Done"));
    
            source.onNext(1);
            source.onNext(2);
            source.onNext(3);
    
            source.onNext(4);
            s.advanceTimeBy(1, TimeUnit.SECONDS);
    
            source.onNext(5);
            source.onNext(6);
    
            s.advanceTimeBy(1, TimeUnit.SECONDS);
            s.advanceTimeBy(1, TimeUnit.SECONDS);
    
            source.onNext(7);
            source.onCompleted();
        }
    }
    

    它将值累积到一个列表中,并为第一个元素启动定时任务,如果缓冲区已满,则发出缓冲区。

    至于flush,一般不能这么简单,你必须和operator建立协议,如果传入的T值是某种特殊类型,就说flush。例如,您在某处有一个 T 类型的 FLUSH 常量,每当操作员遇到它时,它应该发出当前缓冲区:

    synchronized (this) {
        b = buffer;
        idx = index;
        if (t != FLUSH) {
            b.add(t);
            int n = b.size();
            if (n == 1) {
                startTimer = true;
            } else
            if (n < maxSize) {
                return;
            } else {
                buffer = new ArrayList<>();
                index = ++idx;
                emit = true;
            }
        } else {
            buffer = new ArrayList<>();
            index = ++idx;
            emit = true;
        }
    }
    

    【讨论】:

    • 哇!感谢您的出色回复,本来以为我们会有更简单的方法来做到这一点,也许在将来,与此同时,我会对此进行测试,如果可行,标记为已批准!
    • 有更多时间查看代码,正是我想要的,再次感谢。只是一个问题,有什么理由使用 SerialSubscription vs Observable.timer() ?
    • 取消订阅可能会与计时器的调度竞争,而且我们需要一种方法来取消之前的计时器。 SerialSubscription 实现了两者:如果下游取消订阅,则任何包含的 Subscription 都将被取消订阅。如果在其上设置了新计时器的订阅,则旧计时器将被取消订阅。
    • 我明白了,我认为使用 timer() 可以做到这一点,感谢您的洞察力!
    • 只是关于强制刷新的注释,从上面的代码中,强制刷新的项目不会包含在发出的缓冲区中,行 b.add(t) 应该保持在 b = buffer 以下没有?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多