【问题标题】:How to chunk a file into 4 equal files如何将文件分块为 4 个相等的文件
【发布时间】:2021-10-22 00:05:54
【问题描述】:

我有一个很大的文件,例如 100MB,我需要使用 golang 将它分成 4 个 25MB 的文件。

这里的事情是,如果我使用 go 例程并读取文件,则不会保留文件中数据的顺序。我使用的代码是

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"

    "github.com/google/uuid"
)

func main() {
    file, err := os.Open("sampletest.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    lines := make(chan string)
    // start four workers to do the heavy lifting
    wc1 := startWorker(lines)
    wc2 := startWorker(lines)
    wc3 := startWorker(lines)
    wc4 := startWorker(lines)
    scanner := bufio.NewScanner(file)

    go func() {
        defer close(lines)
        for scanner.Scan() {
            lines <- scanner.Text()
        }

        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()

    writefiles(wc1, wc2, wc3, wc4)
}

func writefile(data string) {
    file, err := os.Create("chunks/" + uuid.New().String() + ".txt")
    if err != nil {
        fmt.Println(err)
    }
    defer file.Close()
    file.WriteString(data)
}

func startWorker(lines <-chan string) <-chan string {
    finished := make(chan string)
    go func() {
        defer close(finished)
        for line := range lines {
            finished <- line
        }
    }()
    return finished
}

func writefiles(cs ...<-chan string) {
    var wg sync.WaitGroup

    output := func(c <-chan string) {
        var d string
        for n := range c {
            d += n
            d += "\n"
        }
        writefile(d)
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
    }()
}

在这里使用此代码,我的文件被分成 4 个相等的文件,但其中的顺序没有保留。 我是golang的新手,非常感谢任何建议。

我从某个站点获取了这段代码,并在这里和那里进行了调整以满足我的要求。

【问题讨论】:

  • 不要使用 goroutines。这里的瓶颈是磁盘 I/O,而不是计算。在这里使用 goroutine 不会获得更好的性能,相反,您会不必要地使应用程序复杂化并得到错误的结果。
  • @icza 感谢您的建议,您能以某种方式帮助我吗.. 在play.golang.org 中分享一些代码示例会有所帮助,提前致谢
  • 所以你是说你写了更复杂的多goroutine版本的文件拆分器,你不知道如何编写最简单的版本来尝试不使用goroutines?跨度>
  • 整个代码不是我写的,我是在网上得到的,在这里和那里进行了调整
  • SO 不是代码编写服务。尝试提出您的解决方案。如果您遇到困难或遇到特定问题,是时候在此处发布以寻求帮助了。

标签: file go chunking


【解决方案1】:

这篇文章展示了几种方法 按长度为N 的块处理文件。 头部和尾部的分裂都被演示了, 以顺序和异步方式。

它还为 OP 提供了排序扇出的解决方案。

下面的 Playground 是使用一个帮助器生成的,位于 https://play.golang.org/p/cGfSxnM1Lnv 它可能会帮助您解压缩使用的多文件游乐场 txtar 格式。原则上,这个想法是能够使用xclip -o | go run main.go unpack &lt;path to write&gt; 将文件写入您的磁盘。但是,我还没有彻底测试过最后一个功能。反过来也可以go run main.go pack &lt;path to read&gt; | xclip

我可能留下了错误,但所有示例都应该从go1.15 开始编译,并且有一个main 函数来演示用法。一路提供测试代码用于演示/启动目的。

我还希望在我的评论中忘记这么多重要细节和在使用这些 api 时需要了解的有用信息。

这些示例可能都没有提供直接的答案,但它应该通过演示和讨论各种选项来帮助推理这些情况。

我发现为时已晚,我完全忘记了io.ReadFull api。为了详尽无遗,每当您遇到limitReader 时,请考虑使用此替代写作https://play.golang.org/p/xKKO3d_J4Yd 从一个更改为另一个应该是直截了当的。

顺序 - 按头部分割

在下面的示例中,文件从头部开始按块分割。 它循环直到源到达io.EOF

section := limitReader(src, limit)
for !section.AtEOF() {
  //...

它可以使用section作为bufio.NewScanner的有限阅读器

for !section.AtEOF() {
  //...
  scanner := bufio.NewScanner(section)
  for scanner.Scan() {

在每次迭代中,都会创建一个新部分以将结果写入其中。

var dst io.WriteCloser
dst = noopCloseWriter{os.Stdout}
// name:=fmt.Sprintf("chunks-%v.txt", partCount)
// dst,err=os.Create(name)
// if err != nil {
//  log.Fatal(err)
//  break
// }

partCount++

scanner := bufio.NewScanner(section)
for scanner.Scan() {
  line := scanner.Bytes()
  line = append(line, '\n')
  n, err := dst.Write(line)
  //...

请注意,由于长度是在摄取时控制的,因此此版本在输出数据的长度方面非常宽松。如果工人产生的输出多于输入,你将获得更大的块。另请注意,如果与limitedReader 定义的边界不匹配,则一行很可能会在中间被截断。

https://play.golang.org/p/WikgL_w9BJ_K

需要引入一个新的 limitedReader,其行为与标准 API 中可用的 io.LimitReader 略有不同,否则它 有趣的是会陷入无限循环https://play.golang.org/p/hwDFaW2Hylb

顺序 - 由尾部分割

在下面的解决方案中,每次调用Write 时,都会创建一个旋转器写入器以将目标及时旋转到所需的输出目标。

var writers writers
dst := &rotateWriters{New: writers.new, N: limit}

writers 只是io.Writer 的列表,它提供new 方法与rotateWriters 实例共享。每次需要新的底层目标时,编写器旋转器都会调用该函数。

使用此 API,当您调用 n, err := dst.Write(line) 时,dst 会自动确定如何/何时旋转目标。

因此,现在写入大大简化了,输入使用常规扫描器处理,结果写入dst,然后交换当前目标。

var total int
scanner := bufio.NewScanner(src)
for scanner.Scan() {
  line := scanner.Bytes()
  line = append(line, '\n')

  n, err := dst.Write(line)
  //...

不过,请注意拨打dst.Close

if n, err := dst.CloseN(); err != nil {
  log.Fatal(err)
}

https://play.golang.org/p/KsGbaIAzQr-

sorted fanout - 在继续之前的快速说明

要实现排序的扇出过程,源必须输出数据 它承载了它的位置。经常会定义一个结构来保存这两个值。

type lineIndex struct {
    data  string
    index int
}

源现在可以通过非常简单的循环发出值

scanner := bufio.NewScanner(src)
var i int
for scanner.Scan() {
    lines <- lineIndex{
        data:  scanner.Text(),
        index: i,
    }
    i++
}
close(lines)

worker不关心位置值,但它必须转发 它在下游以供以后使用。

defer wg.Done()
for line := range lines {
  line.data = worker(line.data)
  out <- line
}

最后,fanout 接收器,一个 fanin,可以接收无序的项目。为了重新排序数据,它维护最后一个发出的值,以及一个非发出值的缓冲区。每次从通道中读取一个值时,它都会将其保存到缓冲区中。按每个块的索引值对缓冲区进行排序。尝试发出下一个匹配位置。它 尝试向下游转发,直到缓冲区为空或 位置不匹配。因为使用了缓冲区,所以最终的排放必须 发生在通道循环之后。

var last int
var buf []lineIndex
for l := range out {
    buf = append(buf, l)
    sort.Slice(buf, func(i int, j int) bool {
        return buf[i].index < buf[j].index
    })

    for len(buf) > 0 && buf[0].index == last {
        fmt.Fprintf(dst, "%v\n", buf[0].data)
        buf = append(buf[:0], buf[1:]...)
        last++
    }
}
for len(buf) > 0 /*&& buf[0].index == last*/ {
    fmt.Fprintf(dst, "%v\n", buf[0].data)
    buf = append(buf[:0], buf[1:]...)
    last++
}

以这种方式进行,让消费者按照推送的顺序检索项目,即使工作被扇出。

https://play.golang.org/p/Cemxz9ejW8t

异步,按头部分割,使用字节切片

在下面的例子中,源被读取,一个新的块被创建,一些数据被加载到其中。生成的字节片按原样发送到下游。这很方便,因为责任明确了资源的所有权。但它也消耗n*limit+limit*stage 内存。其中n 是worker 的数量,limit 是块大小。

go func() {
    for i := int64(0); i*limit < sz; i++ {
        f, err := os.Open(path)
        if err != nil {
            panic(err)
        }
        f.Seek(i*limit, 0)
        src := io.LimitReader(bufio.NewReader(f), limit)
        var dst bytes.Buffer
        io.Copy(&dst, src)
        f.Close()
        out <- block{data: dst.Bytes(), index: int(i)}
    }
    close(out)
}()

扇出的工作人员可以毫不费力地抓取数据并将其传递给工作人员。当前演示使用bytes.Buffer 向worker 展示一个简单的io.Reader / io.Writer API。

for rc := range src {
  var wsrc bytes.Buffer
  var wdst bytes.Buffer
  wsrc.Write(rc.data)
  worker(&wdst, &wsrc)
  out <- block{
    index: rc.index,
    data:  wdst.Bytes(),
  }
}

为了向下游转发数据,需要使用缓冲区的后备数组。同样,责任不分担。

writePosReaders 只是接收数据并及时将它们写入文件目的地,它比写入器旋转器更简单。关于输出块的大小非常松散。 writePosReaders 甚至不会尝试对传入的块进行排序,因为它在这里没有多大意义,但它对于测试目的很有用。

https://play.golang.org/p/qwWjhvyBfKZ

异步,头部分割,使用位置来文件资源

在下面的解决方案中,开发了与上一个示例相同的逻辑,但它会尝试通过将指针传递给读取器来降低内存使用率。

维护和编写更加乏味和痛苦,因为我们失去了所有权的明显视野。

这也是解决concurrently changing a file offset using `file.Seek` escapes the race detector?中描述的probem的方法

来源非常相似,但请注意它不是Close文件资源,它保持打开状态。

go func() {
    for i := int64(0); i*limit < sz; i++ {
        f, err := os.Open(path)
        if err != nil {
            panic(err)
        }
        f.Seek(i*limit, 0)
        x := readClose{
            Reader: io.LimitReader(bufio.NewReader(f), limit),
            Closer: f,
        }
        out <- posReadCloser{rc: x, index: int(i)}
    }
    close(out)
}()

资源仅在扇出期间关闭;在工人将其排干后倒入目的地。关闭文件的语句是 rc.Close() 在下面的摘录中。

go func(i int) {
  defer wg.Done()
  for rc := range src {
    pr, pw := io.Pipe()
    wg2.Add(1)
    go func(rc io.ReadCloser, pw *io.PipeWriter, pr *io.PipeReader) {
      defer wg2.Done()
      err := worker(pw, rc)
      rc.Close()
      if err != nil {
        pw.CloseWithError(err)
      } else {
        pw.Close()
      }
      pr.Close()
    }(rc.rc, pw, pr)
    out <- posReadCloser{
      index: rc.index,
      rc:    pr,
    }
  }
}(i)

io.Pipe 足以将作家变成读者。该算法可以立即将reader-end发送到下游,io信令是异步完成的。

https://play.golang.org/p/fQEYMNOGaL4

异步,被尾部分割

最后一个示例使用io.Pipe 耦合到rotateWriters 来解决问题。

请记住,sortedFanout 会写入从io.Reader 读取的转换数据到io.Writer。写入io.Writer 的输出数据的存储顺序与写入io.Reader 的顺序相同。

因此,我们可以通过简单的io.Copy(dst, pr)将管道读取端的内容倒入旋转器写入器中以分块写入内容。

  //...
    pr, pw := io.Pipe()
  //...
    go func() {
    //...
    errs <- sortedFanout(src, pw, echo)
    errs <- pw.Close()
    //...
  }()
  //...
  go func() {
    defer wg.Done()
    dst := &rotateWriters{New: writers.new, N: limit}
    n, err := io.Copy(dst, pr)
    //...
  }()

在我看来非常整洁。

https://play.golang.org/p/FGpm03i4CQD

同步 - 将所有内容放在一起

同步版本只是重新使用以前的工具。

func proceed(dst io.Writer, src io.Reader, limit int64, handler func(io.Writer, io.Reader) error) error {
    section := limitReader(src, limit)
    blocksWriter := &rotateWriters{New: noopCloseWriterOf(dst), N: limit}
    defer blocksWriter.Close()
    for !section.AtEOF() {
        handler(blocksWriter, section)
    }
    return nil
}

https://play.golang.org/p/nCfrwXXTpo0

异步 - 将所有内容放在一起

这是一个端到端的解决方案,它拆分了源和端。 虽然它操纵bytes.Buffer 并且错误管理几乎没有。


func main() {
    path := os.ExpandEnv("${GOROOT}/src/testdata/Isaac.Newton-Opticks.txt")

    limit := int64(1<<20) * 10 // ~10 MB
    limit = 100

    file, err := os.Open(path)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    src := bufio.NewReader(file)
    dst := os.Stdout
    err = proceedAndCheck(dst, src, limit, echo)
    // err = proceedAndCheck(dst, src, limit, echoVerbose)
    // err = proceedAndCheck(dst, src, limit, worker)
    if err != nil {
        log.Fatal(err)
    }
}

func proceed(dst io.Writer, src io.Reader, limit int64, handler func(io.Writer, io.Reader) error) error {
    blocks := toBytesBuffer(src, limit)
    blocks = sortedFanout(blocks, handler)
    blocksWriter := &rotateWriters{New: noopCloseWriterOf(dst), N: limit}
    defer blocksWriter.Close()
    return toWriter(blocksWriter, blocks)
}

toBytesBufferbytes.Buffer 的生产者

func toBytesBuffer(src io.Reader, limit int64) (out chan blockPos) {
    out = make(chan blockPos)
    go func() {
        section := limitReader(src, limit)
        var i int
        for !section.AtEOF() {
            var buf bytes.Buffer
            io.Copy(&buf, section) // we still cant use io.LimitReader because io.EOF is rightfuly suppressed.
            out <- blockPos{
                data:  &buf,
                index: i,
            }
            i++
        }
        close(out)
    }()
    return
}

toWriter 读取块的 chan 并将它们写入 rotator writer

func toWriter(dst io.Writer, src chan blockPos) error {
    for block := range src {
        _, err := io.Copy(dst, block.data)
        if err != nil {
            return err
        }
    }
    return nil
}

sortedFanout 稍作修改以处理带有位置的bytes.Buffer

type blockPos struct {
    data  io.Reader
    index int
}

func sortedFanout(src chan blockPos, worker func(io.Writer, io.Reader) error) chan blockPos {

    fanin := make(chan blockPos)
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for block := range src {
                var buf bytes.Buffer
                worker(&buf, block.data)
                block.data = &buf
                fanin <- block
            }
            wg.Done()
        }()
    }
  //...

    return out
}

https://play.golang.org/p/Mloohek0Ipk

【讨论】:

    【解决方案2】:

    我从某个站点获取了这段代码,并在这里和那里进行了调整以满足我的要求。

    根据您的说法,您应该能够将代码从并发运行修改为顺序运行,这比将并发方面应用于现有代码更容易。

    工作基本上就是:去掉并发部分。

    无论如何,下面是一个简单的示例,说明如何实现您想要的。我以您的代码为基础,然后删除与并发进程相关的所有内容。

    package main
    
    import (
        "bufio"
        "fmt"
        "log"
        "os"
        "strings"
    
        "github.com/google/uuid"
    )
    
    func main() {
        split := 4
    
        file, err := os.Open("file.txt")
        if err != nil {
            log.Fatal(err)
        }
        defer file.Close()
    
        scanner := bufio.NewScanner(file)
        texts := make([]string, 0)
        for scanner.Scan() {
            text := scanner.Text()
            texts = append(texts, text)
        }
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    
        lengthPerSplit := len(texts) / split
        for i := 0; i < split; i++ {
            if i+1 == split {
                chunkTexts := texts[i*lengthPerSplit:]
                writefile(strings.Join(chunkTexts, "\n"))
            } else {
                chunkTexts := texts[i*lengthPerSplit : (i+1)*lengthPerSplit]
                writefile(strings.Join(chunkTexts, "\n"))
            }
        }
    }
    
    func writefile(data string) {
        file, err := os.Create("chunks-" + uuid.New().String() + ".txt")
        if err != nil {
            fmt.Println(err)
        }
        defer file.Close()
        file.WriteString(data)
    }
    

    【讨论】:

    • 虽然,这个解决方案假设文件适合内存并且块基于文本内容,而不是字节,这可能导致块的文件大小不相等。
    • 是的,没错。我猜 OP 应该改进代码以满足他/她的需要。
    • @novalagung 感谢 sn-p,这段代码运行良好,但问题是我无法一次性读取文件中的完整数据,这是因为如果我读取一个 pod 可能会崩溃1GB 文件,所以我想使用可以定期清除的 go 例程,但是使用 go 例程不会保留数据的顺序。
    • @KrishnaChaitanya 大多数时候,使用 goroutine 肯定会给我们带来性能提升。但是对于这种特殊情况(i/o 文件操作),您不应该使用 goroutine。
    【解决方案3】:

    这是一个简单的文件拆分器。您可以自己处理剩余部分,我将剩余字节添加到第 5 个文件中。

    package main
    
    import (
        "bufio"
        "fmt"
        "os"
    )
    
    func main() {
        file, err := os.Open("sample-text-file.txt")
        if err != nil {
            panic(err)
        }
        defer file.Close()
    
        // to divide file in four chunks
        info, _ := file.Stat()
        chunkSize := int(info.Size() / 4)
    
        // reader of chunk size
        bufR := bufio.NewReaderSize(file, chunkSize)
    
        // Notice the range over slice of len 5, after 4 leftover will be written to 5th file
        for i := range [5]int{} {
            reader := make([]byte, chunkSize)
            rlen, err := bufR.Read(reader)
            fmt.Println("Read: ", rlen)
            if err != nil {
                panic(err)
            }
            writeFile(i, rlen, &reader)
        }
    }
    
    // Notice bufW as a pointer to avoid exchange of big byte slices
    func writeFile(i int, rlen int, bufW *[]byte) {
        fname := fmt.Sprintf("file_%v", i)
        f, err := os.Create(fname)
        defer f.Close()
    
        w := bufio.NewWriterSize(f, rlen)
        wbytes := *(bufW)
        wLen, err := w.Write(wbytes[:rlen])
        if err != nil {
            panic(err)
        }
        fmt.Println("Wrote ", wLen, "to", fname)
        w.Flush()
    }
    

    【讨论】:

    • 虽然你假设当你读取时你会得到完整的缓冲区,但是该方法返回读取的字节数的原因正是因为你可以获得小于填充的缓冲区大小.写作也是一样。因此,您最终可能会得到超过所需数量的文件。
    • @Nick,你怎么知道它会读取 1096 个字节? Reader 接口的文档清楚地指出,读取操作读取的内容可能小于您提供的切片大小。它说“字节最多取自底层阅读器上的一次读取,因此 n 可能小于 len(p)”。写入也有类似的情况。这就是为什么读取和写入都会告诉您它们实际处理了多少,而不是依赖于您提供的切片的大小。所以,你的意图是正确的,你的代码不是。
    • @EdwinDalorzo 现在我明白了。谢谢你的解释。因此,io.ReadFull(bufR, reader); 将是一个更好的选择。你说 Write 也有同样的问题,有什么替代方案?
    • 尼克看到这篇文章stackoverflow.com/questions/68791873/…关于传递一个指向切片的指针。我有点理解,你指的是调整大小,但是,阅读后调整大小应该是自动的,这是一种模式。一些边缘情况可能会告诉我错误,但它们非常罕见,而且大多数情况下没有调整切片大小是错误的。 Edwin 的评论也是对的,我之前没有点击过。
    • @mh-cbon 谢谢,该视频有助于理解地图和切片。现在我明白了你的意思。 :) 这篇文章很符合您的意思stackoverflow.com/questions/39993688/are-slices-passed-by-value
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-01-02
    • 1970-01-01
    • 1970-01-01
    • 2020-01-19
    • 1970-01-01
    • 2017-01-02
    • 2011-12-09
    相关资源
    最近更新 更多