【问题标题】:Solving goroutines deadlock解决 goroutines 死锁
【发布时间】:2018-07-29 15:39:29
【问题描述】:

我一直在尝试解决我在 Golang 并发中遇到的这个简单问题。我一直在寻找所有可能的解决方案,但没有发现任何特定于我的问题(或者我可能会错过一个)。这是我的代码:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2)
    go producer(ch, 200*time.Millisecond, 5)

    for {
        fmt.Println(<-ch)    
    }

    close(ch)
}

打印错误:

致命错误:所有 goroutine 都处于休眠状态 - 死锁!

goroutine 1 [chan 接收]: main.main() D:/code/go/src/testconcurrency/main.go:23 +0xca 退出状态2

避免这个错误的有效方法是什么?,谢谢。

【问题讨论】:

  • 你可以通过这个question
  • 您的问题是您在通道中“发布”整数,然后您试图从主线程中读取它们。然后整数流停止,当 goroutine 结束时。 (当producers 都结束时,不再向通道发布整数)并且您仍然在主线程上等待发布更多整数,这永远不会发生。在频道不再使用后,请尝试使用close(ch) 关闭频道。
  • 我做了close(ch)
  • 你在导致死锁的地方关闭了它。您“在无限循环中”,一直在轮询通道,等待发布给它的任何整数。两个生产者都结束后,您仍在等待更多,而没有生产。 close(ch) 必须在生产者的末尾(但在您的情况下,这将很困难,因为您有 2 个生产者)。
  • 先尝试,删除一个生产者,然后运行代码,同样的crash会发生。然后尝试将close(ch)移动到生产者例程的末尾,会成功。但在 2 个生产者的情况下,您可能需要另一个渠道或其他东西,您需要更好的设计。

标签: go concurrency channel goroutine


【解决方案1】:

您需要同步 goroutine 中的所有异步进程。您的主线程和 goroutine 线程不是同步进程。您的主线程永远不会知道何时停止从 goroutine 调用通道。由于您的主线程在通道上循环,它总是从通道调用值,并且当 goroutine 完成并且通道停止发送值时,您的主线程无法从通道中获取更多值,因此条件变为死锁。为避免这种情况,请使用sync.WaitGroup 来同步异步进程。

代码如下:

package main

import (
    "fmt"
    "time"
    "sync"
)

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
    defer wg.Done();
}

func main() {
    wg  := &sync.WaitGroup{}
    ch  := make(chan int);

    wg.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wg);
    go producer(ch, 200*time.Millisecond, 5, wg);

    go func() {   
    wg.Wait()
    close(ch)
    }()

    // print the outputs
    for i:= range ch {
        fmt.Println(i);
    }
}

https://play.golang.org/p/euMTGTIs83g

希望对你有帮助。

由于我的解决方案看起来与已经回答的有点相似,我在修改之前将其更改为我的原始答案以适应 OP 问题。

代码如下:

package main

import (
    "fmt"
    "time"
    "sync"
)

// producer produce values tobe sent to consumer
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
}

// consumer consume all values from producers
func consumer(ch chan int, out chan int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:= range ch {
        out <- i
    }
}

// synchronizer synchronize all goroutines to avoid deadlocks
func synchronizer(ch chan int, out chan int, wgp *sync.WaitGroup, wgc *sync.WaitGroup) {
    wgp.Wait()
    close(ch)
    wgc.Wait()
    close(out)
}

func main() {
    wgp  := &sync.WaitGroup{}
    wgc  := &sync.WaitGroup{}
    ch  := make(chan int);
    out := make(chan int);

    wgp.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wgp);
    go producer(ch, 200*time.Millisecond, 5, wgp);

    wgc.Add(1);
    go consumer(ch, out, wgc)

    go synchronizer(ch, out, wgp, wgc)

    // print the outputs
    for i:= range out {
        fmt.Println(i);
    }
}

使用consumer goroutine 到fan-in 来自多个 goroutine 的所有输入,并从 consumer goroutine 读取所有值。

希望对你有帮助。

【讨论】:

  • 这不是和我的解决方案“完全”一样吗(除了你只调用一次wg.Add(),并且“放错”了defer wg.Done())?
  • 我很惊讶我们有相同的解决方案。我曾经在某处读过有关 sync.waitgroup 的信息。实际上,我曾经写过与此稍有不同的解决方案,但我对其进行了更改以匹配操作问题。我会换成我自己的。
【解决方案2】:

您有“短命”的生产者,它们仅在有限时间内在通道上发送值,并且您有一个无休止的 for 循环,它无休止地从通道接收值,没有终止条件,并且通道只有在这个无限循环之后才会关闭。一旦生产者停止发送值,就会陷入僵局。

频道必须由生产者关闭,表示不会再在其上发送任何值。由于你有多个生产者没有同步(生产者彼此不同步),一般情况下你无法判断哪个会先完成,所以你不能指定一个关闭通道(并且一个通道只能关闭一次,见Why Go's channel can close twice?;和Closing channel of unknown length)。

你必须“协调”生产者,当所有人都完成工作后,协调者应该关闭频道。

并且消费者应该在通道上使用for range,因为for range 构造在通道关闭之前接收来自通道的所有值,然后它会自动终止。

为了协调建议使用sync.WaitGroup。在这种情况下您是使用全局的还是本地的并将其传递给生产者取决于您。使用本地将使解决方案更通用且更易于扩展。需要注意的一点是,您必须将指针传递给sync.WaitGroup。每当你启动一个新的生产者时,使用WaitGroup.Add() 增加等待组。当生产者完成后,它可以使用WaitGroup.Done() 发出信号,最好使用defer (所以无论如何它都会运行,在异常情况下减轻死锁)。控制器可以等待所有生产者完成使用WaitGroup.Wait()

这是一个完整的解决方案:

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int)

    wg.Add(1)
    go producer(ch, 100*time.Millisecond, 2, wg)
    wg.Add(1)
    go producer(ch, 200*time.Millisecond, 5, wg)

    go func() {
        wg.Wait()
        close(ch)
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

输出(在Go Playground 上试试):

0
0
1
1
2
3
4

查看相关问题:Prevent the main() function from terminating before goroutines finish in Golang

【讨论】:

    【解决方案3】:

    更简单的答案 - 生产者之一需要关闭通道,而消费者可以覆盖通道。

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func producer(ch chan int, d time.Duration, num int, closer bool) {
    
        for i:=0; i<num; i++ {
            ch <- i
            time.Sleep(d)   
        }
        if closer {
            close(ch)
        }
    }
    
    func main() {
        ch := make(chan int)
    
        go producer(ch, 100*time.Millisecond, 2, false)
        go producer(ch, 200*time.Millisecond, 5, true)
    
        for i := range ch {
            fmt.Println(i)
        }
    
    }
    

    当然,除非您知道哪个生产者总是最后完成,否则您不会希望在实际代码中执行此操作。更好的设计在其他答案中的基于 WaitGroup 的模式中。但这是这段代码避免死锁的最简单方法。

    【讨论】:

    • 关闭通道时,如果不是“关闭者”的生产者仍在运行,会发生什么?这不是一个非常稳定的设计。
    • 当然!我给出了这个答案,因为提问者卡在代码上,这是最简单的解开方法。设计在后。
    • 不过,他们几乎会立即重新卡住。我完全赞成“一次一个问题”,但通过创建不同的问题来解决问题似乎适得其反,尤其是当 sync.WaitGroup 存在来处理这种情况时。
    • 是的,也许。有时引入不同的设计是难以掌握的。无论如何,我编辑了我的答案以阐明目标。
    【解决方案4】:

    问题在于&lt;-ch 正在阻塞,因此如果您不向通道添加任何新值,它将永远阻塞。一种方法是用一个开关选择替换它,它也是阻塞的,但允许在多个频道上收听。您还必须添加退出通道。在您的示例中,一旦退出通道收到两个值,我们就可以中断。 break 语句需要一个标签,因为我们想退出 switch 和 for 循环。

    https://play.golang.org/p/wGdCulZDnrx

    另一种方法是拥有多个输入通道,并在完成发送后立即关闭它们。为此,每个 goroutine 都需要自己的通道,否则我们将在第一个 goroutine 完成时退出。

    第三种选择是创建一个合并函数,将多个通道合并为一个。这允许将通道的创建移动到生产者中,因此它们在一个位置被创建、填充和关闭。合并功能比较复杂,但是从业务逻辑代码中去掉,可以单独理解和测试。然后将主要代码简化为:

    ch1 := producer(100*time.Millisecond, 2)
    ch2 := producer(200*time.Millisecond, 5)
    
    for i := range merge(ch1, ch2) {
        fmt.Println(i)
    }
    

    https://play.golang.org/p/2mv8ILhJPIB

    合并函数来自https://blog.golang.org/pipelines

    【讨论】:

    • 我更喜欢merge 函数。您提供的博客准确地解释了fan-infan-out 以及如何处理多个goroutine。谢谢顺便说一句。
    【解决方案5】:

    使用两个等待组可以优雅地解决这个问题。通过关闭频道ch,我们向消费者发出没有更多数据的信号。

    解决方案可以很好地适应更多消费者。

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
        defer wg.Done()
        for i := 0; i < num; i++ {
            ch <- i
            time.Sleep(d)
        }
    }
    
    func consumer(ch <-chan int, wg *sync.WaitGroup) {
        defer wg.Done()
        for x := range ch {
            fmt.Println(x)
        }
    }
    
    func main() {
        ch := make(chan int)
        producers := &sync.WaitGroup{}
        consumers := &sync.WaitGroup{}
    
        producers.Add(2)
        go producer(ch, 100*time.Millisecond, 2, producers)
        go producer(ch, 200*time.Millisecond, 5, producers)
    
        consumers.Add(1)
        go consumer(ch, consumers)
    
        producers.Wait()
        close(ch)
        consumers.Wait()
    }
    

    【讨论】:

      猜你喜欢
      • 2015-12-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-11
      • 2023-03-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多