【问题标题】:How to write functions that are executed in goroutines to send and receive messages?如何编写在 goroutines 中执行的函数来发送和接收消息?
【发布时间】:2025-12-20 10:05:06
【问题描述】:

我必须编写一个客户端代码,它以字符串的形式从服务器接收消息,并从控制台获取输入以结束向服务器发送消息。这两个操作应该彼此同时运行。我编写了一个代码来执行这些操作,但不是同时执行。

这是我当前的代码:

func SocketClient() {
    conn, err := net.Dial("tcp", ":9000")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    server_reader := bufio.NewReader(conn)
    input_reader := bufio.NewReader(os.Stdin)
    for {
        // for sending messages
        buff, err := input_reader.ReadString('\n')
        if err != nil {
            log.Fatalln(err)
        }
        conn.Write([]byte(buff))

        // for receiving messages but this won't run until the above has taken input from console
        buff2, err := server_reader.ReadString('\n')
        if err != nil {
            log.Fatalln(err)
        }
        fmt.Println("Received: %s", buff2)
    }
}

buff 接收来自服务器的传入消息,然后 buff2 从控制台获取传出消息输入,但为了再次接收传入消息,buff2 需要一些输入。我知道这可以使用通道、互斥锁等来完成,但是由于我对基础知识缺乏了解,我在使用它们时遇到了问题。

我猜实际的代码应该是这样的:

func SocketClient() {
    conn, err := net.Dial("tcp", ":9000")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    go func() {
        // for sending messages
    } ()
    go func() {
        // for receiving messages
    } ()
}

如何让输入和输出成为两个独立的 goroutine?

【问题讨论】:

  • 互斥锁用于同步访问一些共享数据,所以这里不适用。将它们分开看起来就像你写的一样:一个例程从conn 读取并打印到标准输出,一个从标准输入读取并写入conn;你已经有了做这些事情的代码。您是否尝试过以这种方式实际实施它?您遇到了什么问题?
  • 关于术语的注释:您不编写 goroutines。您编写在 goroutine 中执行的函数。把 goroutine 想象成线程。

标签: go networking tcp websocket server


【解决方案1】:

我写了一篇关于 Go 并发的博文。 https://marcofranssen.nl/concurrency-in-go/

还使用了这篇博文中的一些概念。

https://marcofranssen.nl/go-webserver-with-gracefull-shutdown/

这还允许我拦截命令行输入,例如安全关闭服务器的信号。

您可以使用类似的方法来获取命令输入并使用它。

主要思想是使用通道在你的 goroutine 之间进行通信。

【讨论】:

    【解决方案2】:

    直接在SocketClient 中运行循环之一。在 SocketClient 启动的新 goroutine 中运行另一个循环。

    func SocketClient() error {
        conn, err := net.Dial("tcp", ":9000")
        if err != nil {
            return err
        }
        defer conn.Close()
        server_reader := bufio.NewReader(conn)
        input_reader := bufio.NewReader(os.Stdin)
    
        go func() {
            defer conn.Close()
    
            // This loop will break and the goroutine will exit when the
            // SocketClient function executes conn.Close().
    
            for {
                buff, err := server_reader.ReadBytes('\n')
                if err != nil {
                    // optional: log.Fatal(err) to cause program to exit without waiting for new input from stdin.
                    return
                }
                fmt.Printf("Received: %s", buff)
            }
        }()
    
        for {
            // for sending messages
            buff, err := input_reader.ReadBytes('\n')
            if err != nil {
                return err
            }
            if _, err := conn.Write(buff); err != nil {
                return err
            }
        }
    }
    

    使用 bufio.Reader ReadBytes 方法而不是 ReadString 以避免在 []bytestring 之间进行不必要的转换。

    【讨论】:

    • 您的代码运行良好,但您能告诉我您在 goroutine 中编写的函数,它究竟什么时候有机会执行?一旦执行到 goroutine 之后的 for 循环,它就会继续运行,因为函数中不涉及 Sleep、chan、Waitgroup。我看到的所有其他教程都涉及使用它们来实现您在此处所做的事情。
    • goroutine 在 SocketClient 函数中启动。它循环读取行,直到 read 返回错误。读取块,直到它读取一行或连接出现错误。
    【解决方案3】:

    您可以创建一个像这样的函数,它返回一个通道。它还会启动一个新的 go 例程,该例程将写入返回的通道。

    这允许消费者调用此方法,并期望返回通道上的值。

    我还更新了方法签名以使用 *bufio.Scanner,因为它是一种更有效的换行符分割方法。

    func ReadFully(scanner *bufio.Scanner) <-chan string {
        out := make(chan  string)
    
        go func() {
            // once all the values have been read, close the channel
            // this tells the consumers that there will not be anymore
            // values and they don't need to keep listening to this channel.
            defer close(out)
    
            for scanner.Scan() {
                out <- scanner.Text()
            }
    
            if err := scanner.Err(); err!= nil {
                log.Fatal(err)
            }
        }()
    
        return out
    }
    

    现在我们可以在SocketClient 中使用这个ReadFully 方法

    func SocketClient() {
        conn, err := net.Dial("tcp", ":9000")
        if err != nil {
            log.Fatal(err)
        }
        defer conn.Close()
    
        serverScan := bufio.NewScanner(conn)
        inputScan := bufio.NewScanner(os.Stdin)
    
    
        serverCh := ReadFully(serverScan)
        inputCh := ReadFully(inputScan)
    
    
        // create a wait group, this stops the SocketClient 
        // method from exiting until the workers are done.
        var wg sync.WaitGroup
        wg.Add(2)
    
        go func() {
            defer wg.Done()
            for v := range inputCh {
                _, _ = conn.Write([]byte(v))
            }
        }()
    
        go func() {
            defer wg.Done()
            for v := range serverCh {
                fmt.Printf("Received: %s\n", v)
            }
        }()
    
        wg.Wait()
    }
    
    

    注意,这个例子完全忽略了取消等。一般来说,你不应该在没有办法停止的情况下启动一个 goroutine。

    查看这篇关于管道的精彩文章。 https://blog.golang.org/pipelines

    【讨论】: