【问题标题】:Limiting memory usage when reading files读取文件时限制内存使用
【发布时间】:2010-09-19 11:22:17
【问题描述】:

我是 Haskell 初学者,我认为这是一个很好的练习。我有一个 我需要在线程A中读取文件的分配,处理文件行 在线程 B_i 中,然后在线程 C 中输出结果。

到目前为止,我已经实现了这一点,但其中一项要求是我们 不能相信整个文件都适合内存。我希望那个懒惰 IO 和垃圾收集器会为我执行此操作,但可惜内存使用情况 一直在上升。

阅读器线程 (A) 读取带有readFile 的文件,然后将其压缩 带有行号并用 Just 包裹。然后写入这些压缩线 到Control.Concurrent.Chan。每个消费者线程 B 都有自己的通道。

每个消费者在有数据时读取自己的频道,如果正则表达式 匹配,它被输出到各自的输出通道包装 在 Maybe 中(由列表组成)。

打印机检查每个 B 线程的输出通道。如果没有 结果(行)为Nothing,打印行。由于此时 不应该引用旧行,我认为垃圾 收集器将能够释放这些行,但唉,我似乎在 这里是错误的。

.lhs 文件在这里: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

所以问题是,我如何限制内存使用,或者允许垃圾 收集器删除线条。

根据要求提供片段。希望缩进不会被严重破坏:)

data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a


producer :: [Input] -> FilePath -> State ()
producer c p = do
  liftIO $ Main.log "Starting producer"
  d <- asks done
  f <- liftIO $ readFile p
  mapM_ (\l -> mapM_
    (liftIO . flip writeChan l) c)
    $ zip [1..] $ map Just $ lines f
  liftIO $ modifyMVar_ d (return . not)

printer :: State ()
printer = do
  liftIO $ Main.log "Starting printer"
  c <- (fmap (map (snd . snd) . M.elems)
    (asks consumers >>= liftIO . readMVar))
  uniq' c
  where head' :: Output -> IO Line
    head' ch = fmap head (readMVar ch)

    tail' = mapM_ (liftIO . flip modifyMVar_
        (return . tail))

    cont ch = tail' ch >> uniq' ch

    printMsg ch = readMVar (head ch) >>=
        liftIO . putStrLn . fromJust . snd . head

    cempty :: [Output] -> IO Bool
    cempty ch = fmap (any id)
        (mapM (fmap ((==) 0 . length) . readMVar ) ch)

    {- Return false unless none are Nothing -}
    uniq :: [Output] -> IO Bool
    uniq ch = fmap (any id . map (isNothing . snd))
        (mapM (liftIO . head') ch)

    uniq' :: [Output] -> State ()
    uniq' ch = do
      d <- consumersDone
      e <- liftIO $ cempty ch
      if not e
        then  do
          u <- liftIO $ uniq ch
          if u then cont ch else do
        liftIO $ printMsg ch
        cont ch
          else unless d $ uniq' ch

【问题讨论】:

    标签: haskell concurrency io heap-memory


    【解决方案1】:

    并发编程不提供定义的执行顺序,除非您自己使用 mvar 等强制执行顺序。因此,生产者线程很可能在任何消费者读取它们并将它们传递之前将所有/大部分行粘贴在 chan 中。另一个应该满足要求的架构是让线程 A 调用惰性读取文件并将结果粘贴到 mvar 中。然后每个消费者线程获取 mvar,读取一行,然后在继续处理该行之前替换 mvar。即使这样,如果输出线程跟不上,那么存储在 chan 上的匹配行数可以任意增加。

    您拥有的是推送架构。要真正让它在恒定空间中发挥作用,请从需求驱动的角度进行思考。找到一种机制,使输出线程向处理线程发出信号,告知它们应该做某事,并让处理线程向读取线程发出信号,告知它们应该做某事。

    另一种方法是使用有限大小的通道来代替——这样当处理器线程没有赶上时,读取器线程就会阻塞,而当输出线程没有赶上时,处理器线程就会阻塞。

    总的来说,这个问题实际上让我想起了 Tim Bray 的宽幅取景器基准测试,尽管要求有所不同。无论如何,它引发了关于实现多核 grep 的最佳方式的广泛讨论。最大的亮点是问题是 IO 绑定的,并且您希望多个读取器线程处理 mmapped 文件。

    在这里查看您想知道的更多信息:http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

    【讨论】:

    • BoundedChan 正在为这种类型的使用进行黑客攻击。
    • 感谢 Tom 和 sci。如果可行,我会尝试实现它并标记为答案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-06
    • 1970-01-01
    • 2014-05-07
    • 1970-01-01
    • 1970-01-01
    • 2022-12-11
    相关资源
    最近更新 更多