【问题标题】:Haskell fast concurrent queueHaskell 快速并发队列
【发布时间】:2015-03-12 02:31:47
【问题描述】:

问题

你好!我正在编写一个日志库,我很想创建一个记录器,它将在单独的线程中运行,而所有应用程序线程只会向它发送消息。我想为这个问题找到最有效的解决方案。我这里需要简单的未绑定队列。

方法

我创建了一些测试来查看可用解决方案的性能,我在这里得到了非常奇怪的结果。我测试了 4 个实现(源代码如下):

  1. pipes-concurrency
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. MVar based as described in the book "Parallel and Concurrent Programming in Haskell" 请注意,此技术为我们提供了容量为 1 的有界队列 - 它仅用于测试

测试

这里是用于测试的源代码:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

您可以使用ghc -O2 Main.hs 编译它并运行它。 测试创建了 20 个消息生产者,每个生产者产生 1000000 条消息。

结果

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

问题

我很想问你为什么管道并发版本的执行速度如此之慢,以及为什么它甚至比基于 chan 的版本还要慢得多。我很惊讶,MVar 是所有版本中最快的——谁能告诉更多,为什么我们会得到这个结果,以及在任何情况下我们是否可以做得更好?

【问题讨论】:

  • 你给它多少操作系统线程?
  • 是的,请问您是否使用-threaded 编译并使用-N 运行?你的机器有多少个内核?
  • 我都试过了 - 有和没有-threaded 和运行+RTS -N8。结果很有趣——管道版本的表现要慢得多(整个示例从 1 秒变为 4 秒!),Chan 也慢得多,unagi 快了大约 20%。我这里有 8 个核心(现代 i7)
  • 似乎TChan 版本在线程化时比MVar 表现更好(应该!):gist.github.com/phadej/ca603306992cee39ce9d
  • MVar 版本的行为不同于 chans。写入 chan 是非阻塞的(假设缓冲区中有空间),写入非空 MVar 是阻塞的。在您的基准测试中,写入和读取很好地对齐,因此MVar 不会成为任何瓶颈。

标签: haskell concurrency profiling stm haskell-pipes


【解决方案1】:

所以我可以简要概述一下ChanTQueuepipes-concurrency 在此处内部使用)的一些分析,这些分析激发了unagi-chan 中的一些设计决策。我不确定它是否会回答你的问题。我建议在进行基准测试时分叉不同的队列并尝试变化,以便真正了解正在发生的事情。

Chan 看起来像:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

这是一个MVars 的链表。 Chan 类型中的两个 MVars 分别充当指向列表当前头部和尾部的指针。这是写的样子:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

在 1 时,作者在写端锁定,在 2 时,我们的项目 a 可供读者使用,在 3 时,写端为其他作者解锁。

这实际上在单一消费者/单一生产者场景中表现得非常好(参见the graph here),因为读取和写入不会竞争。但是一旦你有多个并发作者,你就会开始遇到麻烦:

  • 一个写入 1 而另一个写入 2 的写入将阻塞并被取消调度(我能够测量上下文切换的最快速度是 ~150ns(非常快);可能在某些情况下它慢得多)。所以当你让许多作家竞争时 您基本上是通过调度程序进行一次大往返,进入MVar 的等待队列,然后最终写入可以完成。

  • 当写入器在 2 处被取消调度(因为它超时)时,它会持有锁,并且在可以再次重新调度之前不允许完成写入;当我们订阅过多,即当我们的线程/核心比率很高时,这会成为一个更大的问题。

最后,使用MVar-per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。

T队列

TQueue 很棒,因为STM 使得推理其正确性变得超级简单。它是一个功能性出列式队列,write 包括简单地读取写入器堆栈、consing 我们的元素并将其写回:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

如果在writeTQueue 写回它的新堆栈后,另一个交错写入执行相同操作,则其中一个写入将被重试。随着更多writeTQueues 被交错,争用的影响变得更糟。然而,性能下降比Chan 慢得多,因为只有一个writeTVar 操作可以使竞争writeTQueues 无效,并且事务非常小(只有一个读取和一个(:))。

读取的工作原理是从写入端“出列”堆栈,将其反转,并将反转的堆栈存储在自己的变量中以便于“弹出”(总而言之,这为我们提供了摊销的 O(1) 推送和弹出)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

读者对作者有一个对称的适度争用问题。在一般情况下,读者和作者不会竞争,但是当阅读器堆栈耗尽时,读者会与其他读者和作者竞争。我怀疑如果您预先加载了具有足够值的TQueue,然后启动了 4 个读取器和 4 个写入器,您可能会引发活锁,因为在下一次写入之前反向难以完成。还值得注意的是,与MVar 不同,对TVar 的写入会同时唤醒许多读者(这可能或多或少,具体取决于场景)。

我怀疑您在测试中没有看到TQueue 的很多弱点;主要是您看到了写入争用的中等影响以及大量分配和 GC 大量可变对象的开销。

鳗鱼酱

unagi-chan 最初是为了很好地处理争用而设计的。它在概念上非常简单,但实现有一些复杂性

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

队列的读取和写入端共享Stream,它们在其上协调传递值(从写入器到读取器)和阻塞指示(从读取器到写入器),并且读取和写入端各有一个独立的原子计数器。一个 write 的工作方式是:

  1. 写入器调用写入计数器上的原子incrCounter 以接收其唯一索引,在该索引上与其(单个)读取器进行协调

  2. 作者找到它的单元格并执行Written a的CAS

  3. 如果成功则退出,否则它会看到读者已经击败它并正在阻止(或继续阻止),因此它执行(\Blocking v)-&gt; putMVar v a) 并退出。

读取以类似且明显的方式工作。

第一个创新是使争用点成为在争用下不会降级的原子操作(就像 CAS/重试循环或类似 Chan 的锁那样)。基于简单的基准测试和实验,fetch-and-add primop, exposed by the atomic-primops library 效果最佳。

那么在 2 中,reader 和 writer 只需要执行一次 compare-and-swap(reader 的快速路径是简单的非原子读取)即可完成协调。

所以为了让unagi-chan变得更好,我们

  • 使用 fetch-and-add 处理争用点

  • 使用无锁技术,这样当我们被超额订阅时,在不合时宜的时间取消调度的线程不会阻塞其他线程的进度(被阻塞的写入器最多可能阻塞计数器“分配”给它的读取器;阅读 unagi-chan 文档中关于异步异常的警告,并注意 Chan 在此处具有更好的语义)

  • 使用数组来存储我们的元素,它具有更好的局部性(但见下文),每个元素的开销更低,对 GC 的压力也更小

最后的说明。使用数组:对数组的并发写入通常不是扩展的一个坏主意,因为您会导致大量缓存一致性流量,因为缓存线在您的写入器线程中来回失效。通用术语是“虚假共享”。但是,我能想到的替代设计也有缓存方面的优势和劣势,它们会条纹写入或其他东西;我已经对此进行了一些试验,但目前还没有任何结论。

我们合理关注虚假共享的一个地方是我们的计数器,我们将其对齐并填充到 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。

【讨论】:

  • 感谢您的详细回答。作为一个更普遍的问题,是否有一个用于生产中的 Haskell 的快速记录器库的良好实现?通过记录,我的意思与@Danilo 相同(不是您可能希望在开发中启用的一些跟踪消息)
  • @pierreR - 我们在公司内部使用了一个快速日志库,我们现在正在开发它的第二个版本。它只是在完成阶段,非常可扩展并且允许多线程日志记录。您应该期望它会在一两天内在 hackage 上发布开源。我会写在这里:)
  • @PierreR:就在这里!一个快速、可扩展的日志库! hackage.haskell.org/package/logger(或在 github 上:github.com/wdanilo/haskell-logger
【解决方案2】:

如果我不得不猜测为什么 pipes-concurrency 性能更差,那是因为每次读取和写入都包含在 STM 事务中,而其他库使用更高效的低级并发原语。

【讨论】:

  • 那是有原因的,还是我们可以期待修复?
  • 呃,性能差异很大。如果我更改一些值(100 个生产者,10000 个消费者,unagi 在 0.64 秒内执行,而管道在大约 70 秒内并发!,这会慢 100 倍以上)
  • pipes-concurrency 的 API 不太可能发生变化。该库强调易用性和正确性而不是性能。
  • 在这种特殊情况下,问题似乎都在于performGC,正如我提出的结果所示。
  • @WojciechDanilo 现在我很清楚正确的说法是“Unagi-Chan 的速度大约是管道并发的 4 倍”
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-01-19
  • 2011-06-27
  • 2019-02-22
  • 2013-04-09
  • 1970-01-01
  • 2017-06-27
  • 1970-01-01
相关资源
最近更新 更多