【问题标题】:RxJava Sliding WindowRxJava 滑动窗口
【发布时间】:2017-08-21 08:56:07
【问题描述】:

我有一个可观察的数据,它发出数据,我想最初将它缓冲三秒钟,然后在初始缓冲之后必须滑动一秒钟。这更像是buffer(timespan,unit,skip),其中跳过位于时间跨度上。

示例:

ObservableData,TimeStamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2)

ExpectedList : {5,10,30},{10,30,40,60},{30,40,60,90}

我可以通过创建自定义运算符来实现这一点。我只是想知道有什么方法可以在不依赖自定义运算符的情况下做到这一点。

【问题讨论】:

    标签: java kotlin rx-java observable reactivex


    【解决方案1】:

    我认为可以使用内置运算符来解决。下面的代码演示了其中一种方法,尽管当用于热源或非轻量级冷源时会变得很棘手 - 我鼓励您将其用于教育/获得想法,而不是生产用途

     @Test
    fun slidingWindow() {
        val events = Flowable.just(
                Data(5, 1.0),
                Data(10, 1.5),
                Data(30, 2.8),
                Data(40, 3.2),
                Data(60, 3.8),
                Data(90, 4.2))
                .observeOn(Schedulers.io())
        val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet()
        Assert.assertNotNull(windows)
        Assert.assertFalse(windows.isEmpty())
    }
    
    fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
            from = 0,
            to = windowSize,
            slideSize = slideSize,
            data = data)
    
    fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> {
        val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data }
        val tail = data.skipWhile { it.time <= from + slideSize }
        val nonEmptyWindow = window.toList().filter { !it.isEmpty() }
        val nextWindow = nonEmptyWindow.flatMapPublisher {
            window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io())
        }
        return nonEmptyWindow.toFlowable().concatWith(nextWindow)
    }
    
    data class Data(val data: Int, val time: Double)
    

    上面的测试产生
    [[5, 10, 30], [10, 30, 40, 60], [30, 40, 60, 90], [40, 60, 90], [90]]

    【讨论】:

    • 感谢它有助于我找到所需的解决方案
    【解决方案2】:

    这可以通过操作符timestamp()scan()来完成,缓存里面的元素扫描和检查它们。 这个想法的示例(它不缓冲初始 3 秒),使用 compose() 运算符。

    /**
     * Sliding overlapping window. If exactCount of elements are within timespan
     * then groups elements into List otherwise filtered out
     */
    fun <T> slidingWindow(
        timespan: Long,
        timespanUnit: TimeUnit,
        exactCount: Int
    ): (Observable<T>) -> Observable<List<T>> {
        return { from ->
            from.timestamp(timespanUnit)
                .scan(
                    ArrayDeque<Timed<T>>(exactCount + 1),
                    { queue: ArrayDeque<Timed<T>>, t: Timed<T>
                        ->
                        queue.addLast(t)
                        if (queue.size > exactCount) queue.removeFirst()
                        queue.removeAll { timed -> queue.last().time() - timed.time() > timespan }
                        queue
                    }
                )
                .filter { it.size == exactCount }
                .map { it.toTypedArray() }
                .map { it.map { timed -> timed.value() } }
        }
    }
    
        @Test
        fun slidingWindow() {
            val testSequence = "asXZYe".asIterable()
            val timespan = 5000L
    
    
            Observable.fromIterable(testSequence)
                .compose(slidingWindow(timespan, TimeUnit.MILLISECONDS, 3))
                .test()
                .assertValues(
                    arrayListOf('a', 's', 'X'),
                    arrayListOf('s', 'X', 'Z'),
                    arrayListOf('X', 'Z', 'Y'),
                    arrayListOf('Z', 'Y', 'e')
                )
        }
    

    我的github:github.com/OndrejMalek/RxSlidingWindow

    【讨论】:

      猜你喜欢
      • 2012-03-05
      • 2012-08-08
      • 2012-10-20
      • 2021-02-20
      • 2014-07-05
      • 2013-12-17
      • 2021-07-01
      • 2017-09-23
      • 2014-07-09
      相关资源
      最近更新 更多