【问题标题】:Thread-safely transforming a value in a mutable map线程安全地转换可变映射中的值
【发布时间】:2013-08-09 14:29:58
【问题描述】:

假设我想在 Scala 中使用可变映射来跟踪我看到某些字符串的次数。在单线程上下文中,这很容易:

import scala.collection.mutable.{ Map => MMap }

class Counter {
  val counts = MMap.empty[String, Int].withDefaultValue(0)

  def add(s: String): Unit = counts(s) += 1
}

不幸的是,这不是线程安全的,因为 getupdate 不会自动发生。

Concurrent mapsa few atomic operations 添加到可变地图 API,但不是我需要的,它看起来像这样:

def replace(k: A, f: B => B): Option[B]

我知道我可以使用ScalaSTMTMap

import scala.concurrent.stm._

class Counter {
  val counts =  TMap.empty[String, Int]

  def add(s: String): Unit = atomic { implicit txn =>
    counts(s) = counts.get(s).getOrElse(0) + 1
  }
}

但是(目前)这仍然是一个额外的依赖项。其他选项包括参与者(另一个依赖项)、同步(可能效率较低)或 Java 的 atomic references (less idiomatic)。

一般来说,我会避免在 Scala 中使用可变映射,但我偶尔会需要这种东西,最近我使用了 STM 方法(而不是只是交叉手指并希望我不会被咬通过天真​​的解决方案)。

我知道这里有很多权衡(额外的依赖与性能与清晰度等),但在 Scala 2.10 中是否有类似“正确”的答案?

【问题讨论】:

  • 写入可变映射的单个 Akka actor 怎么样? Counter.add 只是向它发送一个即发即弃的消息。至于读取,根据您的需要,它们可以同时发生,也可以通过演员进行。

标签: scala map concurrency thread-safety


【解决方案1】:

这个怎么样?假设你现在真的不需要一个通用的replace 方法,只需要一个计数器。

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

object CountedMap {
  private val counts = new ConcurrentHashMap[String, AtomicInteger]

  def add(key: String): Int = {
    val zero = new AtomicInteger(0)
    val value = Option(counts.putIfAbsent(key, zero)).getOrElse(zero)
    value.incrementAndGet
  }
}

您可以获得比在整个地图上同步更好的性能,并且您还可以获得原子增量。

【讨论】:

  • 谢谢——我对一般情况很感兴趣,但很高兴看到这很容易。
  • 这是正确的解决方案,并利用了非常高性能的 Java 并发库。
  • 我很好奇是否有理由使用 ConcurrentHashMap 而不是 concurrent.TrieMap。我没有意见,只是论坛是API的广告。
  • @som-snytt 我不知道那个课程,我现在对选择其中一个没有意见。
【解决方案2】:

最简单的解决方案肯定是同步。如果没有太多争用,性能可能不会那么差。

否则,您可以尝试汇总您自己的类似 STM 的 replace 实现。这样的事情可能会做:

object ConcurrentMapOps {
  private val rng = new util.Random
  private val MaxReplaceRetryCount = 10
  private val MinReplaceBackoffTime: Long = 1
  private val MaxReplaceBackoffTime: Long = 20
}
implicit class ConcurrentMapOps[A, B]( val m: collection.concurrent.Map[A,B] ) {
  import ConcurrentMapOps._
  private def replaceBackoff() {
    Thread.sleep( (MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime) ).toLong ) // A bit crude, I know
  }

  def replace(k: A, f: B => B): Option[B] = {
    m.get( k ) match {
      case None => return None
      case Some( old ) =>
        var retryCount = 0
        while ( retryCount <= MaxReplaceRetryCount ) {
          val done = m.replace( k, old, f( old ) )
          if ( done ) {
            return Some( old )
          }
          else {         
            retryCount += 1
            replaceBackoff()
          }
        }
        sys.error("Could not concurrently modify map")
    }
  }
}

请注意,冲突问题仅限于给定键。如果两个线程访问同一个映射但使用不同的键,则不会发生冲突,并且替换操作总是第一次成功。如果检测到冲突,我们会稍等片刻(随机时间,以尽量减少线程永远争夺同一个密钥的可能性)然后重试。

我不能保证这是生产就绪的(我现在刚刚扔掉它),但这可能会奏效。

更新:当然(正如 Ionuț G. Stan 指出的那样),如果您只想增加/减少一个值,java 的 ConcurrentHashMap 已经以无锁方式提供了这些操作。 如果您需要更通用的replace 方法,将转换函数作为参数,则我的上述解决方案适用。

【讨论】:

【解决方案3】:

如果您的地图只是作为 val 坐在那里,那您就是在自找麻烦。如果它符合您的用例,我会推荐类似

class Counter {
  private[this] myCounts = MMap.empty[String, Int].withDefaultValue(0)
  def counts(s: String) = myCounts.synchronized { myCounts(s) }
  def add(s: String) = myCounts.synchronized { myCounts(s) += 1 }
  def getCounts = myCounts.synchronized { Map[String,Int]() ++ myCounts }
}

用于低争用。对于高争用,您应该使用旨在支持此类使用的并发映射(例如java.util.concurrent.ConcurrentHashMap)并将值包装在AtomicWhatever中。

【讨论】:

    【解决方案4】:

    如果您可以使用基于未来的界面:

    trait SingleThreadedExecutionContext {
      val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
    }
    
    class Counter extends SingleThreadedExecutionContext {
      private val counts = MMap.empty[String, Int].withDefaultValue(0)
    
      def get(s: String): Future[Int] = future(counts(s))(ec)
    
      def add(s: String): Future[Unit] = future(counts(s) += 1)(ec)
    }
    

    测试将如下所示:

    class MutableMapSpec extends Specification {
    
      "thread safe" in {
    
        import ExecutionContext.Implicits.global
    
        val c = new Counter
        val testData = Seq.fill(16)("1")
        await(Future.traverse(testData)(c.add))
        await(c.get("1")) mustEqual 16
      }
    }
    

    【讨论】:

    • 这根本不是线程安全的。虽然您保证一次只有一个作者,但在修改地图时您仍然可以让线程读取
    • 据我了解,所有使用 ec 作为上下文的操作——读、写、混合——都是线程安全的。该上下文之外的操作将不是线程安全的。如果这种理解是正确的,将很高兴听取其他人的意见。
    • 但问题是,读取是直接完成的:当您访问c.counts 时,您根本没有使用ExecutionContext
    • c.counts 是一个可变映射,因此将其暴露给外部世界并不是一个好主意。我通过添加 get 方法并使地图私有化来更新代码。要点是:对可变映射的所有读/写都必须由 ec 保护。如果这不可接受,请使用并发数据结构。
    • 是的,这就是我的重点。现在好多了。实际上,如果 OP 确实可以调整他的代码以使用期货,这在易用性和正确性方面是最好的解决方案(尽管不是速度方面)。
    猜你喜欢
    • 2020-08-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-08
    • 2011-03-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多