【问题标题】:How can I create single-thread coroutine context under Common pool in Kotlin?如何在 Kotlin 的 Common pool 下创建单线程协程上下文?
【发布时间】:2018-10-25 13:47:50
【问题描述】:

短要求:能够创建协程上下文,该上下文将仅在单线程中执行(例如,没有并行性)。

附加要求:这些任务最好使用现有的 CommonPool(例如线程池)

实际上 kotlin 协程有 newSingleThreadContext 方法,它将创建单独的线程并将所有任务安排到其中。但是,这是专用线程,因此大约 1000 个这样的上下文将需要大量资源。

因此,我希望上下文具有以下特征:

  • 最多可以同时执行一项任务
  • 此上下文应重用任何其他上下文(例如父上下文)。例如,上下文不应包含额外的线程

【问题讨论】:

    标签: java multithreading kotlin threadpool


    【解决方案1】:

    kotlinx.coroutines 库的1.6.0 版本开始,我们可以在CoroutineDispatcher 对象上使用limitedParallelism 函数,它允许您在不创建额外线程池的情况下限制并行度,并提供统一的方法来为未绑定的调度程序创建并行性。

    使用示例:

    class UserRepository {
        private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)
    
        suspend fun getUserById(userId: Int): User? = withContext(dbDispatcher) {
            executeQuery("SELECT * FROM users WHERE id = $1", userId).singleOrNull()
       }
    }
    

    limitedParallelism(1) 保证并行性限制 - 在这个调度器中最多可以同时执行 1 个协程。

    应该能解决问题:

    最多可以同时执行一项任务。

    【讨论】:

      【解决方案2】:

      这里有一个解决方案:

      当您说withSerialContext(Dispatchers.Default) {doWork()} 时,它会在默认调度程序线程上执行doWork(),但它的所有部分都会像在 runBlocking{} 中那样一次执行一个。请注意,即使一次只有一个线程,也不能保证整个操作都是同一个线程。

      suspend fun <T> withSerialContext(
              context: CoroutineDispatcher,
              block: suspend CoroutineScope.() -> T
      ): T = withContext(SerialContextDispatcher(context), block)
      
      private class SerialContextDispatcher(private val target: CoroutineDispatcher) : CoroutineDispatcher() {
      
          private val q = ConcurrentLinkedQueue<Runnable>()
          //Whoever CASes this false->true schedules execution of runproc
          private val pending = AtomicBoolean(false)
          //Only one of these runs at a time
          private val runproc = object: Runnable {
              override fun run() {
                  while(true) {
                      val proc = q.poll();
                      if (proc != null) {
                          try {
                              proc.run()
                          }
                          catch (e: Throwable) {
                              target.dispatch(EmptyCoroutineContext, this)
                              throw e
                          }
                      } else {
                          pending.set(false);
                          if (q.isEmpty() || !pending.compareAndSet(false, true)) {
                              return
                          }
                      }
                  }
              }
          }
      
          override fun dispatch(context: CoroutineContext, block: Runnable) {
              q.add(block)
              if (pending.compareAndSet(false, true)) {
                  target.dispatch(EmptyCoroutineContext, runproc)
              }
          }
      }
      

      【讨论】:

      • 这个实现会阻塞一个默认的调度线程,而有工作可以阻止其他协程运行。还有一个竞争条件,如果调度在val proc = q.poll();pending.set(false); 之间运行,那么该块将留在队列中。
      • 默认调度线程不忙时不阻塞。 runproc 在发现 q 为空时返回。也没有比赛条件。如果poll()之后有dispatch,那么在pending.set(false)之后,q.isEmpty()就会为false,循环会继续执行dispatched的工作
      • 哦,你说的是公平w.r.t。调度程序线程...我实际上比每次挂起时都将任务发送到行尾更好,因为它更适合我想到的负载类型,但很容易修改以在每个任务之间重新安排任务,如果你喜欢。
      • 是的,看来我的比赛条件陈述是错误的,抱歉(希望我可以编辑并删除原始评论)。
      【解决方案3】:

      我发现,创建这样的上下文没有简单的解决方案。

      在 githuib 上有一个未解决的问题 - https://github.com/Kotlin/kotlinx.coroutines/issues/261

      我想我会在找到正确的解决方案时更新这个问题。

      【讨论】:

        猜你喜欢
        • 2019-06-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-05-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多