【问题标题】:Properly passing data on stdin to a command and receiving data from stdout of that command in golang正确地将标准输入上的数据传递给命令并在 golang 中从该命令的标准输出接收数据
【发布时间】:2013-02-14 02:48:48
【问题描述】:

我有以下程序:

package main

import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    go func() {
            // Removing the following lines allow some output
            // to be fetched from cat's stdout sometimes
            time.Sleep(5 * time.Second)
            io.Copy(os.Stdout, stdout)
    }()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

在循环中运行时,我没有得到任何结果,如下所示:

$ while true; do go run cat_thingy.go; echo ; done



^C

此结果是在 Ubuntu 12.04 上从 apt 在虚拟机(go 版本 go1)上安装 golang-go 后出现的。我无法在 Macbook Air(go 版本 go1.0.3)上的 go 安装上进行复制。这似乎是某种竞赛条件。事实上,如果我设置了 sleep(1*time.Second),我永远不会在代码中以随机睡眠为代价看到问题。

我在代码中做错了什么,还是这是一个错误?如果是bug,修复了吗?

更新:可能的线索

我发现 Command.Wait 会关闭与 cat 子进程通信的管道,即使它们仍有未读数据。我不太确定处理该问题的正确方法。我想我可以创建一个通道来通知何时完成对标准输入的写入,但我仍然需要知道 cat 进程是否已经结束,以确保不会将任何其他内容写入其标准输出管道。我知道我可以使用 cmd.Process.Wait 来确定进程何时结束,但是然后调用 cmd.Wait 是否安全?

更新:越来越近

这是对代码的新删减。我相信这对于写入标准输入和从标准输出读取是有效的。我认为如果我从标准输出处理 goroutine 中替换 io.Copy 而没有流的东西,我可以让它正确地流式传输数据(而不是全部缓冲)。

package main

import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6

func main() {
    runtime.GOMAXPROCS(5)
    runCatFromStdin(populateStdin(numInputBlocks))
}

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
        for i := 0; i < numInputBlocks; i++ {
          repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
          fmt.Printf("%s\n", repeatedBytes)
          io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
        }
    }
}

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    output_done_channel := make(chan bool)
    go func() {
        out_bytes := new(bytes.Buffer)
        io.Copy(out_bytes, stdout)
        fmt.Printf("%s\n", out_bytes)
        fmt.Println(out_bytes.Len())
        fmt.Println(inputBufferBlockLength*numInputBlocks)
        output_done_channel <- true
    }()
    <-output_done_channel
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

【问题讨论】:

    标签: go


    【解决方案1】:

    这是您的第一个有效代码的版本。请注意添加了 sync.WaitGroup 以确保在关闭命令之前完成发送和接收 go 例程。

    package main
    
    import (
        "bytes"
        "io"
        "log"
        "os"
        "os/exec"
        "sync"
        "time"
    )
    
    func main() {
        runCatFromStdinWorks(populateStdin("aaa\n"))
        runCatFromStdinWorks(populateStdin("bbb\n"))
    }
    
    func populateStdin(str string) func(io.WriteCloser) {
        return func(stdin io.WriteCloser) {
            defer stdin.Close()
            io.Copy(stdin, bytes.NewBufferString(str))
        }
    }
    
    func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
        cmd := exec.Command("cat")
        stdin, err := cmd.StdinPipe()
        if err != nil {
            log.Panic(err)
        }
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            log.Panic(err)
        }
        err = cmd.Start()
        if err != nil {
            log.Panic(err)
        }
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            populate_stdin_func(stdin)
        }()
        go func() {
            defer wg.Done()
            time.Sleep(5 * time.Second)
            io.Copy(os.Stdout, stdout)
        }()
        wg.Wait()
        err = cmd.Wait()
        if err != nil {
            log.Panic(err)
        }
    }
    

    (这只是@peterSO 所说的另一种说法;-)

    【讨论】:

    • 这不仅仅是@peterSO 所说的另一种说法。它实际上正确处理了管道缓冲区,因为 cat 的输入是在与输出分开的 goroutine 中处理的。我还认为 WaitGroups 比我用来进行同步的通道好一点。作为 cmd.Wait() 的副作用,我仍然觉得管道关闭相对令人困惑。这真的很令人困惑,因为它直到过程结束后才会发生。
    【解决方案2】:

    Go statements

    “go”语句开始执行函数或方法调用 一个独立的并发控制线程,或 goroutine,在 相同的地址空间。

    GoStmt = "go" 表达式。

    表达式必须是调用。函数值和参数是 在调用 goroutine 中像往常一样评估,但与常规的不同 调用,程序执行不等待被调用的函数 完全的。相反,该函数开始独立执行 新的协程。当函数终止时,它的 goroutine 也 终止。如果函数有任何返回值,它们将被丢弃 当函数完成时。

    将免费的 goroutine 转换为函数调用。

    package main
    
    import (
        "bytes"
        "io"
        "log"
        "os"
        "os/exec"
    )
    
    func main() {
        runCatFromStdinWorks(populateStdin("aaa\n"))
        runCatFromStdinWorks(populateStdin("bbb\n"))
    }
    
    func populateStdin(str string) func(io.WriteCloser) {
        return func(stdin io.WriteCloser) {
            defer stdin.Close()
            io.Copy(stdin, bytes.NewBufferString(str))
        }
    }
    
    func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
        cmd := exec.Command("cat")
        stdin, err := cmd.StdinPipe()
        if err != nil {
            log.Panic(err)
        }
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            log.Panic(err)
        }
        err = cmd.Start()
        if err != nil {
            log.Panic(err)
        }
        populate_stdin_func(stdin)
        io.Copy(os.Stdout, stdout)
        err = cmd.Wait()
        if err != nil {
            log.Panic(err)
        }
    }
    

    【讨论】:

    • 您的代码有效,因为我的示例中的管道缓冲区永远不会满。将 goroutine 更改为函数调用通常不起作用。在一般情况下,cat 进程用于通信的管道将具有一定大小的缓冲区。例如,标准输入管道有一定的缓冲区。一旦该缓冲区已满,对管道的写入将被阻塞。在 Linux 上,我相信缓冲区大小是 64KiB。在标准输出的 cat 管道上也会有类似的问题。在主代码中做阻塞 I/O 意味着那些阻塞调用会阻塞主代码。
    猜你喜欢
    • 2014-02-02
    • 1970-01-01
    • 2015-06-01
    • 2010-11-17
    • 1970-01-01
    • 1970-01-01
    • 2011-06-14
    • 1970-01-01
    • 2010-09-13
    相关资源
    最近更新 更多