我认为可以使用内置运算符来解决。下面的代码演示了其中一种方法,尽管当用于热源或非轻量级冷源时会变得很棘手 - 我鼓励您将其用于教育/获得想法,而不是生产用途
@Test
fun slidingWindow() {
val events = Flowable.just(
Data(5, 1.0),
Data(10, 1.5),
Data(30, 2.8),
Data(40, 3.2),
Data(60, 3.8),
Data(90, 4.2))
.observeOn(Schedulers.io())
val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet()
Assert.assertNotNull(windows)
Assert.assertFalse(windows.isEmpty())
}
fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
from = 0,
to = windowSize,
slideSize = slideSize,
data = data)
fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> {
val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data }
val tail = data.skipWhile { it.time <= from + slideSize }
val nonEmptyWindow = window.toList().filter { !it.isEmpty() }
val nextWindow = nonEmptyWindow.flatMapPublisher {
window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io())
}
return nonEmptyWindow.toFlowable().concatWith(nextWindow)
}
data class Data(val data: Int, val time: Double)
上面的测试产生
[[5, 10, 30],
[10, 30, 40, 60],
[30, 40, 60, 90],
[40, 60, 90],
[90]]