【问题标题】:How to compress the output when writing to a file?写入文件时如何压缩输出?
【发布时间】:2016-10-20 11:06:18
【问题描述】:

我有一个计算与其他东西一起生成一些数据(很多),我想写入一个文件。

现在代码的结构方式是(简化的):

writeRecord :: Handle -> Record -> IO ()
writeRecord h r = hPutStrLn h (toByteString r)

然后在更大的计算期间定期调用此函数。它几乎就像一个日志,实际上是同时写入多个文件。

现在我想用Gzip 压缩输出文件。 在像 Java 这样的语言中,我会做这样的事情:

outStream = new GzipOutputStream(new FileOutputStream(path)) 

然后将写入该包装的输出流。

在 Haskell 中这样做的方式是什么? 我想写一些像

writeRecord h r = hPut h ((compressed . toByteString) r)

不正确,因为单独压缩每个小比特效率不高(我什至尝试过,压缩文件的大小比这种方式未压缩的要大)。

我也不认为我可以只生成一个懒惰的ByteString(甚至是一个块列表)然后用compressed . fromChunks 编写它,因为这将需要我的“生成器”在内存中构建完整的东西。并且同时生成多个文件这一事实使其变得更加复杂。

那么在 Haskell 中解决这个问题的方法是什么?写入文件并将其压缩?

【问题讨论】:

  • 管道包可能是你需要的。您可以创建从生产者到消费者的数据流,并在两者之间转换数据(使用管道)。不过,我对包的了解不足以写一个关于它的答案。
  • 管道/导管可能是一个解决方案。但是,它可能具有与“分叉”相关的额外复杂性:假设一个“生成器”想要写入 5 个不同的文件(不同的数据,不同的速率),因此有 1 个生产者和 5 个消费者......不确定它是否会建模很好。对于这样一个简单(看起来)的要求,看起来也有点矫枉过正?
  • 什么会打开您正在写入的句柄? 是您想要通过压缩器插入“管道”的位置,就像在您的 Java 示例中一样。
  • @chepner 我确实控制打开手柄。如果我可以将它“通过管道”通过压缩机,那将是理想的,但你能指出我如何它可以做到吗?
  • 所有流媒体库都支持熟悉的压缩形式,参见例如hackage.haskell.org/package/io-streams-1.3.5.0/docs/… 因为它有点受 java 启发。但我想你明白这一点。无法提供建议,因为您没有充分说明“许多文件”进入图片的方式。

标签: haskell compression gzip


【解决方案1】:

对于增量压缩,我认为您可以在Codec.Compression.Zlib.Internal 中使用compressIO/foldCompressStream

如果您能够将您的生产者操作表示为IO (Maybe a)(例如MVar take 或InputStream/Chan read),其中Nothing 表示输入结束,类似这样的操作应该可以:

import System.IO (Handle)
import qualified Data.ByteString as BS
import qualified Codec.Compression.Zlib.Internal as ZLib

compressedWriter :: Handle -> (IO (Maybe BS.ByteString)) -> IO ()
compressedWriter handle source =
  ZLib.foldCompressStream
    (\next -> source >>= maybe (next BS.empty) next)
    (\chunk next -> BS.hPut handle chunk >> next)
    (return ())
    (ZLib.compressIO ZLib.rawFormat ZLib.defaultCompressParams)

【讨论】:

    【解决方案2】:

    使用管道执行此操作相当简单,但您需要稍微调整一下代码。我已经整理了一个前后代码的示例来演示它。基本思路是:

    • hPutStr h 替换为yield
    • 添加一些 liftIO 包装器
    • 不要使用withBinaryFile等,而是使用runConduitResgzipsinkFile

    示例如下:

    #!/usr/bin/env stack
    -- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
    {-# LANGUAGE OverloadedStrings #-}
    import Control.Monad.IO.Class (MonadIO, liftIO)
    import Data.ByteString (ByteString, hPutStr)
    import Data.Conduit (ConduitM, (.|), yield, runConduitRes)
    import Data.Conduit.Binary (sinkFile)
    import Data.Conduit.Zlib (gzip)
    import System.IO (Handle)
    
    -- Some helper function you may have
    someAction :: IO ByteString
    someAction = return "This is a string\n"
    
    -- Original version
    producerHandle :: Handle -> IO ()
    producerHandle h = do
        str <- someAction
        hPutStr h str
    
    -- Conduit version
    producerConduit :: MonadIO m => ConduitM i ByteString m ()
    producerConduit = do
        str <- liftIO someAction
        yield str
    
    main :: IO ()
    main = runConduitRes $ producerConduit
                        .| gzip
                        .| sinkFile "some-file.txt.gz"
    

    您可以在the conduit tutorial 中了解有关管道的更多信息。

    你的 Java 想法很有趣,再给我几分钟,我会添加一个看起来更像那样的答案。

    编辑

    这是一个更接近您的 Java 风格方法的版本。它依赖于 SinkFunc.hs 模块,该模块可作为 Gist 在:https://gist.github.com/snoyberg/283154123d30ff9e201ea4436a5dd22d

    #!/usr/bin/env stack
    -- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
    {-# LANGUAGE OverloadedStrings #-}
    {-# OPTIONS_GHC -Wall -Werror #-}
    import Data.ByteString (ByteString)
    import Data.Conduit ((.|))
    import Data.Conduit.Binary (sinkHandle)
    import Data.Conduit.Zlib (gzip)
    import System.IO (withBinaryFile, IOMode (WriteMode))
    import SinkFunc (withSinkFunc)
    
    -- Some helper function you may have
    someAction :: IO ByteString
    someAction = return "This is a string\n"
    
    producerFunc :: (ByteString -> IO ()) -> IO ()
    producerFunc write = do
        str <- someAction
        write str
    
    main :: IO ()
    main = withBinaryFile "some-file.txt.gz" WriteMode $ \h -> do
        let sink = gzip .| sinkHandle h
        withSinkFunc sink $ \write -> producerFunc write
    

    EDIT 2 再多一个,实际上是使用ZipSink 将数据流式传输到多个不同的文件。有很多种不同的切片方法,但这是一种可行的方法:

    #!/usr/bin/env stack
    -- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
    {-# LANGUAGE OverloadedStrings #-}
    import Control.Monad.Trans.Resource (MonadResource)
    import Data.ByteString (ByteString)
    import Data.Conduit (ConduitM, (.|), yield, runConduitRes, ZipSink (..))
    import Data.Conduit.Binary (sinkFile)
    import qualified Data.Conduit.List as CL
    import Data.Conduit.Zlib (gzip)
    
    data Output = Foo ByteString | Bar ByteString
    
    fromFoo :: Output -> Maybe ByteString
    fromFoo (Foo bs) = Just bs
    fromFoo _ = Nothing
    
    fromBar :: Output -> Maybe ByteString
    fromBar (Bar bs) = Just bs
    fromBar _ = Nothing
    
    producer :: Monad m => ConduitM i Output m ()
    producer = do
        yield $ Foo "This is going to Foo"
        yield $ Bar "This is going to Bar"
    
    sinkHelper :: MonadResource m
               => FilePath
               -> (Output -> Maybe ByteString)
               -> ConduitM Output o m ()
    sinkHelper fp f
        = CL.mapMaybe f
       .| gzip
       .| sinkFile fp
    
    main :: IO ()
    main = runConduitRes
         $ producer
        .| getZipSink
                (ZipSink (sinkHelper "foo.txt.gz" fromFoo) *>
                 ZipSink (sinkHelper "bar.txt.gz" fromBar))
    

    【讨论】:

    • 我之前没有看到cmets,我只是在回答最初的问题。如果这确实是提问者正在寻找的东西,它会涉及到管道中的 ZipSink 之类的东西,我现在正在写的其他方法作为概念证明。
    • 另外,一般来说,即使有些东西“简单”,对于图书馆的新手来说,一个工作示例通常更容易理解。
    【解决方案3】:

    所有流媒体库都支持压缩。如果我了解特定问题以及您的思考方式,io-streams 可能是最简单的。在这里,我交替写入trumpclinton 输出流,它们被写为压缩文件。接下来我展示了 pipes 相当于 Michael 的 conduit 程序

    #!/usr/bin/env stack
    -- stack --resolver lts-6.21 --install-ghc runghc --package io-streams
    {-# LANGUAGE OverloadedStrings #-}
    
    import qualified System.IO.Streams as IOS
    import qualified System.IO as IO
    import Data.ByteString (ByteString)
    
    analyzer :: IOS.OutputStream ByteString -> IOS.OutputStream ByteString -> IO ()
    analyzer clinton trump = do 
      IOS.write (Just "This is a string\n") clinton
      IOS.write (Just "This is a string\n") trump
      IOS.write (Just "Clinton string\n") clinton
      IOS.write (Just "Trump string\n") trump   
      IOS.write (Just "Another Clinton string\n") clinton
      IOS.write (Just "Another Trump string\n") trump   
      IOS.write Nothing clinton
      IOS.write Nothing trump
    
    main:: IO ()
    main = 
      IOS.withFileAsOutput "some-file-clinton.txt.gz" $ \clinton_compressed ->
      IOS.withFileAsOutput "some-file-trump.txt.gz" $ \trump_compressed -> do
         clinton <- IOS.gzip IOS.defaultCompressionLevel clinton_compressed
         trump <- IOS.gzip IOS.defaultCompressionLevel trump_compressed
         analyzer clinton trump
    

    显然,您可以在写入两个输出流的操作之间在analyzer 中混合各种IO - 我只是在writes 中显示,可以这么说。特别是,如果analyzer 被理解为依赖于输入流,则writes 可以依赖于来自输入流的reads。 Here's 一个(稍微!)更复杂的程序可以做到这一点。如果我运行上面的程序,我会看到

    $ stack gzip_so.hs  
    $ gunzip some-file-clinton.txt.gz 
    $ gunzip some-file-trump.txt.gz 
    $ cat some-file-clinton.txt 
    This is a string
    Clinton string
    Another Clinton string
    $ cat some-file-trump.txt 
    This is a string
    Trump string
    Another Trump string
    

    使用管道和导管有多种方法可以实现上述效果,并且零件的分解程度更高。然而,写入单独的文件会更加微妙。无论如何,这里的管道相当于 Michael S 的管道程序:

    #!/usr/bin/env stack
    -- stack --resolver lts-6.21 --install-ghc runghc  --package pipes-zlib 
    {-# LANGUAGE OverloadedStrings #-}
    import Control.Monad.IO.Class (MonadIO, liftIO)
    import Data.ByteString (ByteString, hPutStr)
    import System.IO  (IOMode(..), withFile, Handle)
    import Pipes  
    import qualified Pipes.ByteString as PB
    import qualified Pipes.GZip as P
    
    -- Some helper function you may have
    someAction :: IO ByteString
    someAction = return "This is a string\n"
    
    -- Original version
    producerHandle :: Handle -> IO ()
    producerHandle h = do
        str <- someAction
        hPutStr h str
    
    producerPipe :: MonadIO m => Producer ByteString m ()
    producerPipe = do
        str <- liftIO someAction
        yield str
    
    main :: IO ()
    main =  withFile "some-file-pipes.txt.gz"  WriteMode $ \h -> 
         runEffect $ P.compress P.defaultCompression producerPipe  >-> PB.toHandle h 
    

    -- 编辑

    这里值得一提的是另一种将多个生产者叠加在带有管道或导管的单个线程上的方法,以添加 Michael S 和 danidiaz 提到的不同方法:

    #!/usr/bin/env stack
    -- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
    {-# LANGUAGE OverloadedStrings #-}
    import Pipes
    import Pipes.GZip
    import qualified Pipes.Prelude as P
    import qualified Pipes.ByteString as Bytes
    import System.IO
    import Control.Monad (replicateM_)
    
    producer = replicateM_ 50000 $ do
        marie  "This is going to Marie\n"  -- arbitary IO can be interspersed here
        arthur "This is going to Arthur\n" -- with liftIO
        sylvia "This is going to Sylvia\n" 
      where 
        marie = yield; arthur = lift . yield; sylvia = lift . lift . yield
    
    sinkHelper h p = runEffect (compress bestSpeed p >-> Bytes.toHandle h)
    
    main :: IO ()
    main =  
       withFile "marie.txt.gz" WriteMode $ \marie ->
       withFile "arthur.txt.gz"  WriteMode $ \arthur -> 
       withFile "sylvia.txt.gz"  WriteMode $ \sylvia ->
          sinkHelper sylvia
          $ sinkHelper arthur
          $ sinkHelper marie
          $ producer
    

    它非常简单和快速,并且可以在具有明显更改的管道中编写 - 但发现它很自然需要从“monad 转换器堆栈”的角度进行更高级别的支持。从 streaming 库之类的角度来看,这将是编写此类程序的最自然方式。

    【讨论】:

      【解决方案4】:

      此解决方案类似于 Michael Snoyman 的 EDIT 2,但使用了foldl管道pipes-zlibstreaming-eversion 包。

       {-# language OverloadedStrings #-}
      module Main where
      
      -- cabal install bytestring foldl pipes pipes-zlib streaming-eversion
      import Data.Foldable
      import Data.ByteString
      import qualified Control.Foldl as L 
      import Pipes 
      import qualified Pipes.Prelude
      import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
      import Streaming.Eversion.Pipes (transvertMIO)
      import System.IO
      
      type Tag = String
      
      producer :: Monad m => Producer (Tag,ByteString) m ()
      producer = do
          yield $ ("foo","This is going to Foo")
          yield $ ("bar","This is going to Bar")
      
      foldForTag :: Handle -> Tag -> L.FoldM IO (Tag,ByteString) ()
      foldForTag handle tag = 
            L.premapM (\(tag',bytes) -> if tag' == tag then Just bytes else Nothing)
          . L.handlesM L.folded
          . transvertMIO (compress defaultCompression defaultWindowBits)
          $ L.mapM_ (Data.ByteString.hPut handle)
      
      main :: IO ()
      main = do
          withFile "foo.txt" WriteMode $ \h1 ->
              withFile "bar.txt" WriteMode $ \h2 ->
                  let multifold = traverse_ (uncurry foldForTag) [(h1,"foo"),(h2,"bar")] 
                  in  L.impurely Pipes.Prelude.foldM multifold producer
      

      【讨论】:

      【解决方案5】:

      此解决方案类似于 Michael Snoyman 的 EDIT 2,但使用了streamingstreaming-bytestring、管道和pipes-zlib 包。

      {-# language OverloadedStrings #-}
      module Main where
      
      -- cabal install bytestring streaming streaming-bytestring pipes pipes-zlib 
      import Data.ByteString
      import qualified Data.ByteString.Streaming as B
      import Streaming
      import qualified Streaming.Prelude as S
      import Pipes (next)
      import qualified Pipes.Prelude 
      import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
      import System.IO
      
      type Tag = String
      
      producer :: Monad m => Stream (Of (Tag,ByteString)) m ()
      producer = do
          S.yield ("foo","This is going to Foo")
          S.yield ("bar","This is going to Bar")
      
      -- I couldn't find a streaming-zlib on Hackage, took a pipes detour
      compress' :: MonadIO m 
                => Stream (Of ByteString) m r -> Stream (Of ByteString) m r 
      compress' = S.unfoldr Pipes.next
                . compress defaultCompression defaultWindowBits
                . Pipes.Prelude.unfoldr S.next     
      
      keepTag :: Monad m 
              => Tag -> Stream (Of (Tag,ByteString)) m r -> Stream (Of ByteString) m r 
      keepTag tag = S.map snd . S.filter ((tag==) . fst)
      
      main :: IO ()
      main = runResourceT 
           . B.writeFile "foo.txt" . B.fromChunks . compress' .  keepTag "foo"  
           . B.writeFile "bar.txt"  . B.fromChunks . compress' . keepTag "bar"  
           $ S.copy producer
      

      我利用Streaming.Prelude 中的copy 函数,它允许您

      复制流的内容,以便在 不同的方式,但不会中断流式传输。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2011-05-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-06-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多