【问题标题】:Capture stdout from exec.Command line by line and also pipe to os.Stdout从 exec.Command 逐行捕获标准输出,并通过管道传输到 os.Stdout
【发布时间】:2021-12-25 12:17:02
【问题描述】:

谁能帮忙?

我有一个正在通过 exec.CommandContext 运行的应用程序(所以我可以通过 ctx 取消它)。除非出错,否则它通常不会停止。

我目前让它将其输出中继到 os.stdOut,它运行良好。但我也想通过一个通道获取每一行 - 这背后的想法是我将在该行上查找一个正则表达式,如果它为真,那么我将设置一个“错误”的内部状态。

虽然我无法让它工作,但我尝试了 NewSscanner。这是我的代码。

正如我所说,它确实会输出到 os.StdOut,这很棒,但我希望接收在我设置的频道中发生的每一行。

有什么想法吗?

提前致谢。

func (d *Daemon) Start() {
    ctx, cancel := context.WithCancel(context.Background())
    d.cancel = cancel

    go func() {
        args := "-x f -a 1"
        cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)

        var stdoutBuf, stderrBuf bytes.Buffer

        cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
        cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)

        lines := make(chan string)

        go func() {
            scanner := bufio.NewScanner(os.Stdin)
            for scanner.Scan() {
                fmt.Println("I am reading a line!")
                lines <- scanner.Text()
            }
        }()

        err := cmd.Start()
        if err != nil {
            log.Fatal(err)
        }

        select {
        case outputx := <-lines:
            // I will do somethign with this!
            fmt.Println("Hello!!", outputx)

        case <-ctx.Done():
            log.Println("I am done!, probably cancelled!")
        }
    }()
}

也试过用这个

        go func() {
            scanner := bufio.NewScanner(&stdoutBuf)
            for scanner.Scan() {
                fmt.Println("I am reading a line!")
                lines <- scanner.Text()
            }
        }()

即便如此,“我正在阅读一行”永远不会出现,我也对其进行了调试,它从未进入“用于扫描仪..”

还尝试扫描&amp;stderrBuf,同样,没有任何输入。

【问题讨论】:

  • 您正在扫描 os.Stdin 而不是 stdout buf
  • 谢谢,是的,我也这么认为,但我之前尝试过,但没有成功。刚刚又试了一次。
  • 更新了主要问题以包含该问题。
  • 两个不同的人建议@IanGregson,几乎只是写了同样的东西。是时候将问题的最小、可执行的再现放入问题中,这样我们就可以用更好的信息来支持我们的答案。
  • (1) 命令输出到stdoutBuf,而不是os.Stdin。 (2) 从stdoutBuf 读取的程序版本在缓冲区存在数据竞争。通过使用管道而不是缓冲区来修复。 (3) 你可能想要一个围绕 select 语句的循环。

标签: go channels


【解决方案1】:

对于一个使用并发和 goroutines 的正确程序,我们应该尽量证明没有数据竞争,程序不会死锁,goroutines 不会泄漏。让我们努力实现这一目标。

完整代码

游乐场: https://play.golang.org/p/Xv1hJXYQoZq。我建议在本地复制和运行,因为 Playground 不会流式输出 afaik 并且它有超时。

请注意,我已将测试命令更改为 % find /usr/local,这是一个典型的长时间运行命令(>3 秒),具有大量输出行,因为它更适合我们应该测试的场景。

演练

让我们看看Daemon.Start 方法。一开始,它几乎是一样的。不过,最值得注意的是,新代码的大部分方法都没有 goroutine。即使没有这个,Daemon.Start 方法仍然是非阻塞的,并且会“立即”返回。


第一个值得注意的修复是这些更新的行。

    outR, outW := io.Pipe()
    cmd.Stdout = io.MultiWriter(outW, os.Stdout)

我们调用io.Pipe,而不是构造 bytes.Buffer 变量。如果我们没有进行此更改并坚持使用 bytes.Buffer,那么一旦没有更多数据可读取,scanner.Scan() 将返回 false。如果命令仅偶尔写入标准输出(就此而言,甚至相隔一毫秒),就会发生这种情况。 scanner.Scan() 返回 false 后,goroutine 退出,我们错过了处理未来的输出。

相反,通过使用io.Pipe 的读取端,scanner.Scan() 将等待来自管道读取端的输入,直到管道的写入端关闭。

这解决了扫描仪和命令输出之间的竞争问题。


接下来,我们构建两个密切相关的 goroutine:第一个从 &lt;-lines 消费,第二个从 lines&lt;- 产生。

    go func() {
        for line := range lines {
            fmt.Println("output line from channel:", line)
            ...
        }
    }()
    go func() {
        defer close(lines)
        scanner := bufio.NewScanner(outR)
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        ...
    }()

lines 通道关闭时,消费者 goroutine 将退出,因为通道的关闭自然会导致范围循环终止;生产者 goroutine 在退出时关闭 lines

producer goroutine 将在scanner.Scan() 返回 false 时退出,这发生在 io.Pipe 的写端关闭时。这种关闭发生在即将到来的代码中。

从上面的两段中可以看出,这两个 goroutine 保证会退出(即不会泄漏)。


接下来,我们启动命令。标准的东西,它是一个非阻塞调用,它会立即返回。

// Start the command.
if err := cmd.Start(); err != nil {
    log.Fatal(err)
}

转到Daemon.Start 中的最后一段代码。这个 goroutine 等待命令通过cmd.Wait() 退出。处理这一点很重要,因为该命令可能出于上下文取消以外的原因。

特别是,我们希望关闭io.Pipe 的写入端(这反过来又会关闭输出行生产者goroutine,如前所述)。

    go func() {
        err := cmd.Wait()
        fmt.Println("command exited; error is:", err)
        outW.Close()
        ...
    }()

附带说明,通过等待cmd.Wait(),我们不必单独等待ctx.Done()。等待cmd.Wait() 处理自然原因导致的退出(命令成功完成、命令遇到内部错误等)和上下文取消导致的退出。

这个 goroutine 也保证会退出。当cmd.Wait() 返回时,它将退出。这可能是因为命令正常退出成功;由于命令错误而退出失败;或因上下文取消而退出失败。


就是这样!我们应该没有数据竞争,没有死锁,也没有泄露的 goroutine。

上面 sn-ps 中省略的行 ("...") 是针对 Daemon 类型的 Done()CmdErr()Cancel() 方法。这些方法在代码中得到了很好的记录,因此这些省略的行希望是不言自明的。

除此之外,查找TODO cmets 以根据您的需要进行错误处理!

测试一下!

使用此驱动程序测试代码。

func main() {
    var d Daemon
    d.Start()

    // Enable this code to test Context cancellation:
    // time.AfterFunc(100*time.Millisecond, d.Cancel)

    <-d.Done()
    fmt.Println("d.CmdErr():", d.CmdErr())
}

【讨论】:

    【解决方案2】:

    当上下文取消时命令终止。如果在命令终止之前可以读取命令的所有输出,则使用以下代码:

    func (d *Daemon) Start() {
        ctx, cancel := context.WithCancel(context.Background())
        d.cancel = cancel
    
        args := "-x f -a 1"
        cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            log.Fatal(err)
        }
        err = cmd.Start()
        if err != nil {
            log.Fatal(err)
        }
    
        go func() {
            defer cmd.Wait()
            scanner := bufio.NewScanner(stdout)
            for scanner.Scan() {
                s := scanner.Text()
                fmt.Println(s) // echo to stdout
                // Do something with s
            }
        }()
    }
    

    当上下文被取消时,命令终止。
    继续阅读stdout 在命令终止时返回 io.EOF。当stdout 返回错误时,goroutine 会跳出扫描循环。

    【讨论】:

      【解决方案3】:

      cmd.Start() 不等待命令完成。此外,需要调用cmd.Wait() 以获知该过程的结束。

      reader, writer := io.Pipe()
      
      cmdCtx, cmdDone := context.WithCancel(context.Background())
      
      scannerStopped := make(chan struct{})
      go func() {
          defer close(scannerStopped)
      
          scanner := bufio.NewScanner(reader)
          for scanner.Scan() {
              fmt.Println(scanner.Text())
          }
      }()
      
      cmd := exec.Command("ls")
      cmd.Stdout = writer
      _ = cmd.Start()
      go func() {
          _ = cmd.Wait()
          cmdDone()
          writer.Close()
      }()
      <-cmdCtx.Done()
      
      <-scannerStopped
      

      scannerStopped 被添加以证明扫描器 goroutine 现在停止了。

      reader, writer := io.Pipe()
      
      scannerStopped := make(chan struct{})
      go func() {
          defer close(scannerStopped)
      
          scanner := bufio.NewScanner(reader)
          for scanner.Scan() {
              fmt.Println(scanner.Text())
          }
      }()
      
      cmd := exec.Command("ls")
      cmd.Stdout = writer
      _ = cmd.Run()
      
      go func() {
          _ = cmd.Wait()
          writer.Close()
      }()
      
      <-scannerStopped
      

      并根据需要处理线条。

      注意:写的有点匆忙。如果有任何不清楚或不正确的地方,请告诉我。

      【讨论】:

      • 感谢您的帮助。我还没有进行这些更改,我之前的代码只是为了测试而快速组合在一起。奇怪的东西!,所以我只将 cmd 更改为“cmd := exec.CommandContext(ctx, "ping", "127.0.0.1")" - 一切正常......所以它似乎与我的命令有关,然后我我正在跑步。也许它检测到我没有运行 TTY 或其他东西。
      • 感谢您的帮助。不知道发生了什么,我使用了 io.Pipe 并开始工作。感谢大家的帮助
      【解决方案4】:

      您必须扫描stdoutBuf 而不是os.Stdin

      scanner := bufio.NewScanner(&stdoutBuf)
      

      【讨论】:

      • 谢谢,是的,我也试过了,我的主要问题已经更新了结果。
      猜你喜欢
      • 1970-01-01
      • 2018-08-10
      • 1970-01-01
      • 2017-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-05
      • 1970-01-01
      相关资源
      最近更新 更多