【问题标题】:Non-blocking alternative to Object.wait()?Object.wait() 的非阻塞替代方案?
【发布时间】:2018-07-20 21:54:08
【问题描述】:

我有一个线程从本地服务器接收数据包:

// Shared object:
@Volatile lateinit var recentPacket: Packet
val recvMutex = Object()

// Thread code:
thread(isDaemon = true) {
    while (true) {
        val packet = readPacket()
        synchronized(recvMutex) {
            recentPacket = packet
            recvMutex.notifyAll()
        }
    }
}

我还有 多个 其他线程在等待数据包,每个线程都应该得到刚刚收到的相同数据包:

suspend fun receive() {
    return synchronized(recvMutex) {
        recvMutex.wait() // non-blocking alternative to this?
        recentPacket
    }
}

它可以工作,但是Object.wait() 阻塞了线程。有没有办法避免这种情况?

【问题讨论】:

  • wait() 的全部意义在于阻塞。这将有助于描述您尝试使用 wait() 调用做什么。据我了解,您希望一个生产者线程通知 n 个消费者线程收到相同的数据包?还是您希望 n 个线程拉出一个队列,队列中的每个任务最终仅由单个线程执行?
  • @Charlie:我想要 1 个生产者线程通知同一个数据包的 n 个消费者线程。我想要一些方法来暂停协程(不阻塞),直到另一个数据包到达(对不起,我不清楚)。
  • 我只能从 Java 世界来描述它,但是您使用 Object.wait 不正确。 Javadoc(可能是 KDoc)描述了您应该如何使用它 - 您需要检查一个条件(例如,最近的数据包不为空,并且与您已经看到的不同)并且仅当该条件不满足,您应该等待(在while 循环中),然后在wait 之后再次检查现在是否满足条件。
  • @StephenC receive()的调用者是协程。
  • 其他线程在收到物品之前究竟会做什么?他们是否运行事件循环?

标签: multithreading kotlin coroutine


【解决方案1】:

协程似乎处于实验状态;我建议等到那些成熟后再使用它们。见https://kotlinlang.org/docs/reference/coroutines.html#experimental-status-of-coroutines

同时,你可以试试ThreadPool

import java.net.DatagramPacket
import java.net.DatagramSocket
import java.util.concurrent.Executors
import kotlin.concurrent.thread

fun start() {
    val pool = Executors.newFixedThreadPool(10)
    thread(isDaemon = true) {
        val socket = DatagramSocket(12345)
        while (!socket.isClosed) {
            val packet = DatagramPacket(ByteArray(1000), 0)
            socket.receive(packet)
            pool.submit({
                receive(packet)
            })
        }
    }
    pool.shutdown()
}

fun receive(packet: DatagramPacket) {
    println(String(packet.data, 0, packet.length))
}

异步 ​​IO 可能有用;你可以看看Java Selectors

【讨论】:

  • 协程已经相当成熟并且可以投入生产,我可以从我的个人经验中证明这一点。他们仍处于试验阶段的唯一原因是团队尚未确定 API 设计的每个细节。见这里:stackoverflow.com/a/46240340/1103872
【解决方案2】:

它可以工作,但是Object.wait() 阻塞了线程。有没有办法避免这种情况?

是的,但这意味着删除您现在使用的完整 wait-notify 习语并用 Kotlin 的原生 BroadcastChannel 替换它。我还建议不要依赖默认的 CommonPool 协程调度程序。这对于原型代码非常方便,但在生产环境中,您应该使用受您控制的线程池。

这是一个基本示例,有两个接收器和五个正在广播的数据包:

import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.channels.BroadcastChannel
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

private val threadPool = Executors.newCachedThreadPool() as ExecutorService
val MyPool = threadPool.asCoroutineDispatcher()

fun main(args: Array<String>) {
    val packetChannel = BroadcastChannel<Packet>(1)
    (1..2).forEach {
        launch(MyPool) {
            receivePackets(it, packetChannel.openSubscription())
        }
    }
    runBlocking {
        (1..5).forEach {
            packetChannel.send(Packet(it))
            delay(100)
        }
    }
    threadPool.shutdown()
}

suspend fun receivePackets(index: Int, packetChannel: SubscriptionReceiveChannel<Packet>) {
    while (true) {
        println("Receiver $index got packet ${packetChannel.receive().value}")
    }
}

data class Packet(
        val value: Int
)

期望看到这样的输出:

Receiver 1 got packet 1
Receiver 2 got packet 1
Receiver 2 got packet 2
Receiver 1 got packet 2
Receiver 1 got packet 3
Receiver 2 got packet 3
Receiver 1 got packet 4
Receiver 2 got packet 4
Receiver 1 got packet 5
Receiver 2 got packet 5

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-12-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多