【问题标题】:Is this implementation of takeWhileInclusive safe?这个 takeWhileInclusive 的实现安全吗?
【发布时间】:2018-10-26 16:15:53
【问题描述】:

我发现了以下包容性takeWhile 的实现(找到here

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)
        result
    }
}

问题是我不是 100% 相信如果在并行序列上使用它是安全的。

我担心的是我们会依赖shouldContinue 变量来知道何时停止,但我们并没有同步它的访问。

有什么见解吗?

【问题讨论】:

  • 我同意这不安全,takeWhile 应该得到一个无状态函数。如果将其用于并行计算,顺便说一句,同步将是您最少的问题。 takeWhile 在这种情况下甚至都没有定义。
  • 我不认为序列完全适用于并行用例?
  • @LouisWasserman 它们不适合并行处理,但它们的合同并未将它们限制为严格的顺序处理。具体来说,takeWhile 的合约声明“操作是中间的并且无状态。”
  • @MarkoTopolnik 我的理解是“操作是中间和无状态的”这句话。来自takeWhile 文档的指的是整个操作,而不是专门指谓词。特别是kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/… 将无状态定义为:不需要状态并独立处理每个元素的操作,或者 需要少量恒定的状态来处理元素,例如 kotlin.sequences.Sequence .take 或 kotlin.sequences.Sequence.drop;
  • 是的,你是对的。不幸的是,这是文档中最接近的。该语句所指的状态是实现的内部状态。合约没有明确声明用户的函数必须是无状态的。在 java.util.stream 包 Javadoc 的“无状态行为”部分对此进行了很好的讨论。但是,在您的情况下,问题不是状态本身,而是您的函数假定顺序遇到顺序

标签: kotlin sequence coroutine kotlinx.coroutines


【解决方案1】:

这是我目前为止的想法。

问题说明

问题不清楚。没有平行序列这种东西,我可能把它们和Java's parallel streams 搞混了。我的意思是同时使用的序列。

序列是同步的

正如@LouisWasserman 在 cmets 中指出的那样,序列不是为并行执行而设计的。特别是SequenceBuilder@RestrictSuspension 注释。引用自Kotlin Coroutine repo:

这意味着在其范围内没有任何 lambda 的 SequenceBuilder 扩展可以调用suspendContinuation 或其他通用挂起函数

话虽如此,正如@MarkoTopolnik 评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。

并行使用的序列

作为示例,这里是第一次尝试并行使用序列

fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }
    }
}

此代码打印:

[ForkJoinPool.commonPool-worker-2] 处理器 #1 收到 1

[ForkJoinPool.commonPool-worker-1] 处理器 #0 收到 0

[ForkJoinPool.commonPool-worker-3] 处理器 #2 收到 2

[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 3

[ForkJoinPool.commonPool-worker-1] 处理器 #4 收到 3

[ForkJoinPool.commonPool-worker-3] 处理器 #5 收到 3

[ForkJoinPool.commonPool-worker-1] 处理器 #7 收到 5

[ForkJoinPool.commonPool-worker-2] 处理器 #6 收到 4

[ForkJoinPool.commonPool-worker-1] 处理器 #9 收到 7

[ForkJoinPool.commonPool-worker-3] 处理器 #8 收到 6

这当然不是我们想要的。因为有些数字被消耗了两次。

输入频道

另一方面,如果我们要使用通道,我们可以这样写:

fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

那么输出是:

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 1

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 2

[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 3

[ForkJoinPool.commonPool-worker-2] 处理器 #2 收到 4

[ForkJoinPool.commonPool-worker-1] 处理器 #3 收到 5

[ForkJoinPool.commonPool-worker-2] 处理器 #4 收到 6

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 7

[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 8

[ForkJoinPool.commonPool-worker-1] 处理器 #2 收到 9

[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 10

此外,我们可以为这样的频道实现takeWhileInclusive 方法:

fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce
        send(it)
    }
}

它按预期工作。

【讨论】:

    猜你喜欢
    • 2013-04-12
    • 1970-01-01
    • 2021-05-12
    • 1970-01-01
    • 2010-09-19
    • 2011-09-28
    • 1970-01-01
    • 2020-07-29
    • 2016-06-12
    相关资源
    最近更新 更多