【问题标题】:Kotlin Synchronize Thread with CoroutineKotlin 用协程同步线程
【发布时间】:2022-01-13 10:52:51
【问题描述】:

目前我有一个不断循环的主线程:

var suspension = Suspension()

fun loop() {
    // Doing stuff here...
        
    suspension.callTick()
        
    // Doing more stuff here...
}

它调用 callTick() 方法将数据发送到通道:

class Suspension {
    private val ticksChannel = Channel<Unit>(0)

    fun callTick() {
        ticksChannel.trySend(Unit)
    }

    suspend fun waitTick() {
        ticksChannel.receive()
    }
}

现在我的最后一堂课使用了这个:

class Task(private val suspension: Suspension) {
    suspend fun runTask() {
        while (true) {
            suspension.waitTick()

            someMethodOnTheMainThread()
        }
    }
}

现在我想知道如何从主线程调用 someMethodOnTheMainThread() 方法。该函数必须在 loop() 中的 'suspension.callTick()' 方法之后立即调用。目前我正在从协程线程运行该函数。这会导致很多错误和空指针异常,因为它没有与主线程同步。

基本上我想知道如何阻塞/锁定主线程,直到调用suspend.waitTick()方法并运行它之后的代码。这太复杂了吗?还有其他方法可以使挂起函数与同步代码一起使用吗?

【问题讨论】:

  • 在挂起函数中,任何时候你使用一个只能从主线程访问的属性或函数,将它包装在withContext(Dispatchers.Main) { }中。
  • 这似乎只适用于Android:Module with the Main dispatcher is missing. Add dependency providing the Main dispatcher, e.g. 'kotlinx-coroutines-android' and ensure it has the same version as 'kotlinx-coroutines-core'
  • 你在哪个平台上?有针对不同平台的协程库,例如 Swing 和 JavaFX,它们为它们提供 Dispatchers.Main。如果您不使用其中之一并且正在设计自己的主线程,则可以创建一个 Dispatchers.Main 来利用您自己的主线程队列。
  • 我在 Windows 上。通过一些快速测试,似乎 Dispatchers.Unconfined 也可以正常工作。从我在原始帖子中给出的示例中,您是否会偶然知道这是否属实?对我来说它似乎很奇怪,因为它说即使在延迟之后线程也是主线程,从文档中它不应该这样做。如果它不起作用,我会尝试创建自己的 Dispatchers.Main。
  • 我的意思是你的目标平台,例如摇摆,JavaFX,iOS,CLI。如果它是 CLI 并且主线程是您自己创建的,我认为您需要为它创建自己的调度程序。如果您希望它在特定线程上可靠地运行代码,Unconfined 将非常脆弱。

标签: kotlin concurrency coroutine synchronize


【解决方案1】:

好的,所以我尝试为客户端线程实现我自己的调度程序,如下所示:

/**
 * Dispatches execution onto Client event dispatching thread and provides native [delay] support.
 */
@Suppress("unused")
val Dispatchers.Client : ClientDispatcher
    get() = Client

/**
 * Dispatcher for Client event dispatching thread.
 *
 * This class provides type-safety and a point for future extensions.
 */
@OptIn(InternalCoroutinesApi::class)
sealed class ClientDispatcher : MainCoroutineDispatcher(), Delay {
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = ClientUtility.invokeLater(block)

    @OptIn(ExperimentalCoroutinesApi::class)
    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener {
            with(continuation) { resumeUndispatched(Unit) }
        })
        continuation.invokeOnCancellation { timer.stop() }
    }

    /** @suppress */
    override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
        val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener {
            block.run()
        })
        return object : DisposableHandle {
            override fun dispose() {
                timer.stop()
            }
        }
    }

    private fun schedule(time: Long, unit: TimeUnit, action: ActionListener): Timer =
        Timer(unit.toMillis(time).coerceAtMost(Int.MAX_VALUE.toLong()).toInt(), action).apply {
            isRepeats = false
            start()
        }
}

@OptIn(InternalCoroutinesApi::class)
internal class ClientDispatcherFactory : MainDispatcherFactory {
    override val loadPriority: Int
        get() = 0

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher = Client
}

private object ImmediateClientDispatcher : ClientDispatcher() {
    override val immediate: MainCoroutineDispatcher
        get() = this

    override fun isDispatchNeeded(context: CoroutineContext): Boolean = !ClientUtility.isEventDispatchThread()

    @OptIn(InternalCoroutinesApi::class)
    override fun toString() = toStringInternalImpl() ?: "Client.immediate"
}

/**
 * Dispatches execution onto Client event dispatching thread and provides native [delay] support.
 */
internal object Client : ClientDispatcher() {
    override val immediate: MainCoroutineDispatcher
        get() = ImmediateClientDispatcher

    @OptIn(InternalCoroutinesApi::class)
    override fun toString() = toStringInternalImpl() ?: "Client"
}

实际上是Swing dispatcher。主要是我将 invokeLater 方法更改为我自己的实现。另一个值得注意的修改是我使用自己的 isEventDispatchThread 实现。

我仍在使用 Swing Timer 类。我不确定这是否可以...但它似乎可以通过一些测试来工作。

对于invokeLater和isEventDispatchThread的实现:

    val eventQueue = ConcurrentLinkedQueue<Runnable>()

    fun invokeLater(block: Runnable) {
        eventQueue.add(block)
    }

    fun isEventDispatchThread() = if (dispatchThread == null) {
            false
        } else {
            Thread.currentThread() == dispatchThread
        }

    var dispatchThread: Thread? = null

然后在循环方法中:

    fun loop() {
        // Doing stuff here

        suspension.callTick()

        ClientUtility.dispatchThread = Thread.currentThread()
        val iterator = ClientUtility.eventQueue.iterator()
        while (iterator.hasNext()) {
            val block = iterator.next()
            block.run()
            iterator.remove()
        }

        // Doing some more stuff here
    }

我现在启动协程:Dispatchers.Main。哪个调度程序是我在resources/META-INF/services.kotlinx.coroutines.internal.MainDispatcherFactory 文件中定义的主要调度程序。

【讨论】:

    【解决方案2】:

    在主线程调用someMethodOnTheMainThread()方法 并暂停当前协程,你可以像下面这样定义它:

    suspend fun someMethodOnTheMainThread() = withContext(Dispatchers.Main) {
        // ... your code here
    }
    

    withContext - 更改协程的上下文并在主线程中运行代码。

    如果我们使用 kotlinx-coroutines-android 工件,

    Dispatchers.Main 可以在 Android 上使用。同样,在 JavaFX 上如果我们使用 kotlinx-coroutines-javafx,在 Swing 上如果我们使用 kotlinx-coroutines-swing。可能还有其他一些设置它的库。如果你不使用任何一个,这个dispatcher没有配置,不能使用,所以你需要自己创建dispatcher。

    【讨论】:

      【解决方案3】:

      如果您没有使用控制主线程的框架(并且会提供主线程调度程序),您可以执行以下操作:

      var mainScope: CoroutineScope? = null;
      
      fun main() {
          runBlocking {
              mainScope = this
              while(true) {
                  //...
                  suspension.callTick()
                  // allow other coroutines to run
                  yield()
                  //...
              }
          }
      }
      

      如果你想从其他地方在主线程上运行某些东西,你可以使用mainScope?.launch

      【讨论】:

      • 我目前的实现似乎还可以正常工作(下面的帖子)。我应该在我原来的主题中指出,主循环使用 Java 并且确实来自一个额外的框架。
      最近更新 更多