【问题标题】:How can we buffer items in each milliseconds and stream each item at constant time interval我们如何在每毫秒内缓冲项目并以恒定的时间间隔流式传输每个项目
【发布时间】:2019-01-11 14:08:06
【问题描述】:

publishSubject 的onNext 方法连续调用(不均匀的时间,大约1 毫秒) 并且要求每 1 秒发射一次这些项目,并且数据不应该丢失意味着应该发射每个项目。

    publishSubject.onNext("Data1");
    publishSubject.onNext("Data2");
    publishSubject.onNext("Data3");
    publishSubject.onNext("Data4");
    publishSubject.onNext("Data5");
    publishSubject.onNext("Data6");
    publishSubject.onNext("Data7");

等等…… 参考代码结构:

var publishSubject = PublishSubject.create<String>()
publishSubject.onNext(stateObject) // Executing at every milliseconds...


publishSubject
        /* Business Logic Required Here ?? */
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            // Should execute at every 1 second
        }

请帮忙,在此先感谢,

【问题讨论】:

  • 如何将项目存储在 Deque 中并使用启动挂起函数的协程每秒获取 deque 的第一个元素?
  • 您的要求令人困惑。如果数据每毫秒到达一次,并且每秒只能发出一次,那么一秒钟后,您将落后 999 个排放量,并且永远追不上。你想用它们做什么?
  • @BobDalgleish,是的,现在每秒将有 999 个排放量一个接一个地排放
  • 1天后,您将落后约86,313,600件; 12天后,你将落后超过100万件。你没发现问题吗?

标签: android kotlin rx-java rx-java2 rx-kotlin2


【解决方案1】:

Observable 类的这个函数扩展正是你所需要的:

fun <T> Observable<T>.delayBetweenItems(timeout: Long, unit: TimeUnit): Observable<T> =
    Observable.zip(this, Observable.interval(timeout, unit), BiFunction<T, Long, T> { item, _ -> item })

您可以在项目的某个实用程序类中声明它,然后像其他 RxJava 运算符一样应用它:

publishSubject
    .delayBetweenItems(1000, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        // Should execute at every 1 second
    }

【讨论】:

  • 它正在工作..但有时(经过很长时间)它会自动处理。并且没有项目正在发射。
  • @RajatSharma 我认为原因在于您如何在 Android 中使用它,而不是在函数本身中。尝试检查您的代码以了解订阅和处置逻辑。
  • 这可能是因为我们没有在其中处理后按。
【解决方案2】:

如何将项目存储在 Deque 中。然后,使用协程启动一个挂起函数来每秒获取一次双端队列的第一个元素?

这里有一些又快又脏的代码,只是为了确保它能正常工作。 您可以在kotlin website 上在线运行此代码。 请记住,我是 Kotlin 的新手。

val deque: Deque<String> = ArrayDeque()
var refMillisAdd: Long = 0
var refMillisTake: Long = 0

fun main() {

    println(" Delay(ms) -> Action")
    println("---------------------")

    kotlinx.coroutines.runBlocking {

        launch {

            refMillisAdd = currentTimeMillis()
            refMillisTake = currentTimeMillis()

            for(i in 0..20){
               oncePer10ms(i.toString())
               refMillisAdd = currentTimeMillis()
            }

            for(i in 0..6){
                oncePerSecond()
                refMillisTake = currentTimeMillis()
            }
        }
    }
}

suspend fun oncePerSecond(){
    kotlinx.coroutines.delay(1_000L)
    println("  ${currentTimeMillis() - refMillisTake} -> TAKE ${deque.pop()}")
}

suspend fun oncePer10ms(item: String){
    kotlinx.coroutines.delay(10L)
    deque.add(item)
    println("  ${currentTimeMillis() - refMillisAdd} -> ADD $item")
}

上面的代码打印出来:

 Delay(ms) -> Action
---------------------
  17 -> ADD 0
  11 -> ADD 1
  10 -> ADD 2
  10 -> ADD 3
  10 -> ADD 4
  10 -> ADD 5
  10 -> ADD 6
  10 -> ADD 7
  10 -> ADD 8
  11 -> ADD 9
  10 -> ADD 10
  10 -> ADD 11
  10 -> ADD 12
  11 -> ADD 13
  10 -> ADD 14
  10 -> ADD 15
  11 -> ADD 16
  10 -> ADD 17
  10 -> ADD 18
  10 -> ADD 19
  11 -> ADD 20
  1223 -> TAKE 0
  1000 -> TAKE 1
  1000 -> TAKE 2
  1001 -> TAKE 3
  1000 -> TAKE 4
  1000 -> TAKE 5
  1000 -> TAKE 6

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多