这篇文章展示了几种方法
按长度为N 的块处理文件。
头部和尾部的分裂都被演示了,
以顺序和异步方式。
它还为 OP 提供了排序扇出的解决方案。
下面的 Playground 是使用一个帮助器生成的,位于
https://play.golang.org/p/cGfSxnM1Lnv 它可能会帮助您解压缩使用的多文件游乐场
txtar 格式。原则上,这个想法是能够使用xclip -o | go run main.go unpack <path to write> 将文件写入您的磁盘。但是,我还没有彻底测试过最后一个功能。反过来也可以go run main.go pack <path to read> | xclip
我可能留下了错误,但所有示例都应该从go1.15 开始编译,并且有一个main 函数来演示用法。一路提供测试代码用于演示/启动目的。
我还希望在我的评论中忘记这么多重要细节和在使用这些 api 时需要了解的有用信息。
这些示例可能都没有提供直接的答案,但它应该通过演示和讨论各种选项来帮助推理这些情况。
我发现为时已晚,我完全忘记了io.ReadFull api。为了详尽无遗,每当您遇到limitReader 时,请考虑使用此替代写作https://play.golang.org/p/xKKO3d_J4Yd 从一个更改为另一个应该是直截了当的。
顺序 - 按头部分割
在下面的示例中,文件从头部开始按块分割。
它循环直到源到达io.EOF。
section := limitReader(src, limit)
for !section.AtEOF() {
//...
它可以使用section作为bufio.NewScanner的有限阅读器
for !section.AtEOF() {
//...
scanner := bufio.NewScanner(section)
for scanner.Scan() {
在每次迭代中,都会创建一个新部分以将结果写入其中。
var dst io.WriteCloser
dst = noopCloseWriter{os.Stdout}
// name:=fmt.Sprintf("chunks-%v.txt", partCount)
// dst,err=os.Create(name)
// if err != nil {
// log.Fatal(err)
// break
// }
partCount++
scanner := bufio.NewScanner(section)
for scanner.Scan() {
line := scanner.Bytes()
line = append(line, '\n')
n, err := dst.Write(line)
//...
请注意,由于长度是在摄取时控制的,因此此版本在输出数据的长度方面非常宽松。如果工人产生的输出多于输入,你将获得更大的块。另请注意,如果与limitedReader 定义的边界不匹配,则一行很可能会在中间被截断。
https://play.golang.org/p/WikgL_w9BJ_K
需要引入一个新的 limitedReader,其行为与标准 API 中可用的 io.LimitReader 略有不同,否则它
有趣的是会陷入无限循环https://play.golang.org/p/hwDFaW2Hylb
顺序 - 由尾部分割
在下面的解决方案中,每次调用Write 时,都会创建一个旋转器写入器以将目标及时旋转到所需的输出目标。
var writers writers
dst := &rotateWriters{New: writers.new, N: limit}
writers 只是io.Writer 的列表,它提供new 方法与rotateWriters 实例共享。每次需要新的底层目标时,编写器旋转器都会调用该函数。
使用此 API,当您调用 n, err := dst.Write(line) 时,dst 会自动确定如何/何时旋转目标。
因此,现在写入大大简化了,输入使用常规扫描器处理,结果写入dst,然后交换当前目标。
var total int
scanner := bufio.NewScanner(src)
for scanner.Scan() {
line := scanner.Bytes()
line = append(line, '\n')
n, err := dst.Write(line)
//...
不过,请注意拨打dst.Close
if n, err := dst.CloseN(); err != nil {
log.Fatal(err)
}
https://play.golang.org/p/KsGbaIAzQr-
sorted fanout - 在继续之前的快速说明
要实现排序的扇出过程,源必须输出数据
它承载了它的位置。经常会定义一个结构来保存这两个值。
type lineIndex struct {
data string
index int
}
源现在可以通过非常简单的循环发出值
scanner := bufio.NewScanner(src)
var i int
for scanner.Scan() {
lines <- lineIndex{
data: scanner.Text(),
index: i,
}
i++
}
close(lines)
worker不关心位置值,但它必须转发
它在下游以供以后使用。
defer wg.Done()
for line := range lines {
line.data = worker(line.data)
out <- line
}
最后,fanout 接收器,一个 fanin,可以接收无序的项目。为了重新排序数据,它维护最后一个发出的值,以及一个非发出值的缓冲区。每次从通道中读取一个值时,它都会将其保存到缓冲区中。按每个块的索引值对缓冲区进行排序。尝试发出下一个匹配位置。它
尝试向下游转发,直到缓冲区为空或
位置不匹配。因为使用了缓冲区,所以最终的排放必须
发生在通道循环之后。
var last int
var buf []lineIndex
for l := range out {
buf = append(buf, l)
sort.Slice(buf, func(i int, j int) bool {
return buf[i].index < buf[j].index
})
for len(buf) > 0 && buf[0].index == last {
fmt.Fprintf(dst, "%v\n", buf[0].data)
buf = append(buf[:0], buf[1:]...)
last++
}
}
for len(buf) > 0 /*&& buf[0].index == last*/ {
fmt.Fprintf(dst, "%v\n", buf[0].data)
buf = append(buf[:0], buf[1:]...)
last++
}
以这种方式进行,让消费者按照推送的顺序检索项目,即使工作被扇出。
https://play.golang.org/p/Cemxz9ejW8t
异步,按头部分割,使用字节切片
在下面的例子中,源被读取,一个新的块被创建,一些数据被加载到其中。生成的字节片按原样发送到下游。这很方便,因为责任明确了资源的所有权。但它也消耗n*limit+limit*stage 内存。其中n 是worker 的数量,limit 是块大小。
go func() {
for i := int64(0); i*limit < sz; i++ {
f, err := os.Open(path)
if err != nil {
panic(err)
}
f.Seek(i*limit, 0)
src := io.LimitReader(bufio.NewReader(f), limit)
var dst bytes.Buffer
io.Copy(&dst, src)
f.Close()
out <- block{data: dst.Bytes(), index: int(i)}
}
close(out)
}()
扇出的工作人员可以毫不费力地抓取数据并将其传递给工作人员。当前演示使用bytes.Buffer 向worker 展示一个简单的io.Reader / io.Writer API。
for rc := range src {
var wsrc bytes.Buffer
var wdst bytes.Buffer
wsrc.Write(rc.data)
worker(&wdst, &wsrc)
out <- block{
index: rc.index,
data: wdst.Bytes(),
}
}
为了向下游转发数据,需要使用缓冲区的后备数组。同样,责任不分担。
writePosReaders 只是接收数据并及时将它们写入文件目的地,它比写入器旋转器更简单。关于输出块的大小非常松散。 writePosReaders 甚至不会尝试对传入的块进行排序,因为它在这里没有多大意义,但它对于测试目的很有用。
https://play.golang.org/p/qwWjhvyBfKZ
异步,头部分割,使用位置来文件资源
在下面的解决方案中,开发了与上一个示例相同的逻辑,但它会尝试通过将指针传递给读取器来降低内存使用率。
维护和编写更加乏味和痛苦,因为我们失去了所有权的明显视野。
这也是解决concurrently changing a file offset using `file.Seek` escapes the race detector?中描述的probem的方法
来源非常相似,但请注意它不是Close文件资源,它保持打开状态。
go func() {
for i := int64(0); i*limit < sz; i++ {
f, err := os.Open(path)
if err != nil {
panic(err)
}
f.Seek(i*limit, 0)
x := readClose{
Reader: io.LimitReader(bufio.NewReader(f), limit),
Closer: f,
}
out <- posReadCloser{rc: x, index: int(i)}
}
close(out)
}()
资源仅在扇出期间关闭;在工人将其排干后倒入目的地。关闭文件的语句是 rc.Close() 在下面的摘录中。
go func(i int) {
defer wg.Done()
for rc := range src {
pr, pw := io.Pipe()
wg2.Add(1)
go func(rc io.ReadCloser, pw *io.PipeWriter, pr *io.PipeReader) {
defer wg2.Done()
err := worker(pw, rc)
rc.Close()
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
pr.Close()
}(rc.rc, pw, pr)
out <- posReadCloser{
index: rc.index,
rc: pr,
}
}
}(i)
io.Pipe 足以将作家变成读者。该算法可以立即将reader-end发送到下游,io信令是异步完成的。
https://play.golang.org/p/fQEYMNOGaL4
异步,被尾部分割
最后一个示例使用io.Pipe 耦合到rotateWriters 来解决问题。
请记住,sortedFanout 会写入从io.Reader 读取的转换数据到io.Writer。写入io.Writer 的输出数据的存储顺序与写入io.Reader 的顺序相同。
因此,我们可以通过简单的io.Copy(dst, pr)将管道读取端的内容倒入旋转器写入器中以分块写入内容。
//...
pr, pw := io.Pipe()
//...
go func() {
//...
errs <- sortedFanout(src, pw, echo)
errs <- pw.Close()
//...
}()
//...
go func() {
defer wg.Done()
dst := &rotateWriters{New: writers.new, N: limit}
n, err := io.Copy(dst, pr)
//...
}()
在我看来非常整洁。
https://play.golang.org/p/FGpm03i4CQD
同步 - 将所有内容放在一起
同步版本只是重新使用以前的工具。
func proceed(dst io.Writer, src io.Reader, limit int64, handler func(io.Writer, io.Reader) error) error {
section := limitReader(src, limit)
blocksWriter := &rotateWriters{New: noopCloseWriterOf(dst), N: limit}
defer blocksWriter.Close()
for !section.AtEOF() {
handler(blocksWriter, section)
}
return nil
}
https://play.golang.org/p/nCfrwXXTpo0
异步 - 将所有内容放在一起
这是一个端到端的解决方案,它拆分了源和端。
虽然它操纵bytes.Buffer 并且错误管理几乎没有。
func main() {
path := os.ExpandEnv("${GOROOT}/src/testdata/Isaac.Newton-Opticks.txt")
limit := int64(1<<20) * 10 // ~10 MB
limit = 100
file, err := os.Open(path)
if err != nil {
log.Fatal(err)
}
defer file.Close()
src := bufio.NewReader(file)
dst := os.Stdout
err = proceedAndCheck(dst, src, limit, echo)
// err = proceedAndCheck(dst, src, limit, echoVerbose)
// err = proceedAndCheck(dst, src, limit, worker)
if err != nil {
log.Fatal(err)
}
}
func proceed(dst io.Writer, src io.Reader, limit int64, handler func(io.Writer, io.Reader) error) error {
blocks := toBytesBuffer(src, limit)
blocks = sortedFanout(blocks, handler)
blocksWriter := &rotateWriters{New: noopCloseWriterOf(dst), N: limit}
defer blocksWriter.Close()
return toWriter(blocksWriter, blocks)
}
toBytesBuffer 是bytes.Buffer 的生产者
func toBytesBuffer(src io.Reader, limit int64) (out chan blockPos) {
out = make(chan blockPos)
go func() {
section := limitReader(src, limit)
var i int
for !section.AtEOF() {
var buf bytes.Buffer
io.Copy(&buf, section) // we still cant use io.LimitReader because io.EOF is rightfuly suppressed.
out <- blockPos{
data: &buf,
index: i,
}
i++
}
close(out)
}()
return
}
toWriter 读取块的 chan 并将它们写入 rotator writer
func toWriter(dst io.Writer, src chan blockPos) error {
for block := range src {
_, err := io.Copy(dst, block.data)
if err != nil {
return err
}
}
return nil
}
sortedFanout 稍作修改以处理带有位置的bytes.Buffer
type blockPos struct {
data io.Reader
index int
}
func sortedFanout(src chan blockPos, worker func(io.Writer, io.Reader) error) chan blockPos {
fanin := make(chan blockPos)
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for block := range src {
var buf bytes.Buffer
worker(&buf, block.data)
block.data = &buf
fanin <- block
}
wg.Done()
}()
}
//...
return out
}
https://play.golang.org/p/Mloohek0Ipk