【问题标题】:Do goroutines with receiving channel as parameter stop, when the channel is closed?当通道关闭时,以接收通道为参数的 goroutine 是否停止?
【发布时间】:2019-05-09 11:10:27
【问题描述】:

我一直在阅读《Building microservices with go》,书中介绍了apache/go-resiliency/deadline 处理超时的包。

deadline.go

// Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
package deadline

import (
    "errors"
    "time"
)

// ErrTimedOut is the error returned from Run when the deadline expires.
var ErrTimedOut = errors.New("timed out waiting for function to finish")

// Deadline implements the deadline/timeout resiliency pattern.
type Deadline struct {
    timeout time.Duration
}

// New constructs a new Deadline with the given timeout.
func New(timeout time.Duration) *Deadline {
    return &Deadline{
        timeout: timeout,
    }
}

// Run runs the given function, passing it a stopper channel. If the deadline passes before
// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper
// channel so that the work function can attempt to exit gracefully. It does not (and cannot)
// simply kill the running function, so if it doesn't respect the stopper channel then it may
// keep running after the deadline passes. If the function finishes before the deadline, then
// the return value of the function is returned from Run.
func (d *Deadline) Run(work func(<-chan struct{}) error) error {
    result := make(chan error)
    stopper := make(chan struct{})

    go func() {
        result <- work(stopper)
    }()

    select {
    case ret := <-result:
        return ret
    case <-time.After(d.timeout):
        close(stopper)
        return ErrTimedOut
    }
}

deadline_test.go

package deadline

import (
    "errors"
    "testing"
    "time"
)

func takesFiveMillis(stopper <-chan struct{}) error {
    time.Sleep(5 * time.Millisecond)
    return nil
}

func takesTwentyMillis(stopper <-chan struct{}) error {
    time.Sleep(20 * time.Millisecond)
    return nil
}

func returnsError(stopper <-chan struct{}) error {
    return errors.New("foo")
}

func TestDeadline(t *testing.T) {
    dl := New(10 * time.Millisecond)

    if err := dl.Run(takesFiveMillis); err != nil {
        t.Error(err)
    }

    if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
        t.Error(err)
    }

    if err := dl.Run(returnsError); err.Error() != "foo" {
        t.Error(err)
    }

    done := make(chan struct{})
    err := dl.Run(func(stopper <-chan struct{}) error {
        <-stopper
        close(done)
        return nil
    })
    if err != ErrTimedOut {
        t.Error(err)
    }
    <-done
}

func ExampleDeadline() {
    dl := New(1 * time.Second)

    err := dl.Run(func(stopper <-chan struct{}) error {
        // do something possibly slow
        // check stopper function and give up if timed out
        return nil
    })

    switch err {
    case ErrTimedOut:
        // execution took too long, oops
    default:
        // some other error
    }
}

第一个问题

// in deadline_test.go
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
    t.Error(err)
}  

我无法理解上述代码的执行流程。据我了解,因为takesTwentyMillis函数的休眠时间超过了设置的超时时间10毫秒,

// in deadline.go
case <-time.After(d.timeout):
    close(stopper)
    return ErrTimedOut

time.After 发出当前时间,并选择这种情况。然后关闭 stopper 通道,返回 ErrTimeout。

我不明白的是,关闭 stopper 通道对可能仍在运行的匿名 goroutine 有什么影响 我认为,当 stopper 通道关闭时,下面的 goroutine 可能仍在运行。

go func() {
        result <- work(stopper)
    }()

(如果我在这里错了,请纠正我)我认为在close(stopper)之后,这个goroutine会调用takesTwentyMillis(=work function),并以stopper channel作为参数。并且该函数将继续并休眠 20 毫秒并返回 nil 以传递给结果通道。 main() 到此结束,对吧?

我看不出在这里关闭停止通道有什么意义。 takesTwentyMillis 函数似乎并没有使用函数体内的通道:(。

第二个问题

// in deadline_test.go within TestDeadline()
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
    <-stopper
    close(done)
    return nil
})
if err != ErrTimedOut {
    t.Error(err)
}
<-done

这是我完全不理解的部分。我认为当dl.Run 运行时,停止通道被初始化。但是因为stopper通道中没有值,所以函数调用会被阻塞在&lt;-stopper...但是因为看不懂这段代码,所以一开始看不明白为什么存在这段代码(即这段代码是什么正在尝试测试,以及它是如何执行的,等等)。


关于第二个问题的第三个(附加)问题

所以我明白当第二个问题中的Run函数触发停止通道关闭时,工作函数得到信号。工人关闭完成的通道并返回零。 我使用 delve(=go debugger) 看到了这一点,gdb 在return nil 行之后将我带到了deadline.go 中的goroutine。

   err := dl.Run(func(stopper <-chan struct{}) error {
        <-stopper
        close(done)
-->     return nil   
    })

输入 n 后跳到下一行,delve 带我到这里

    go func() {
-->             result <- work(stopper)
            }()

进程在这里结束,因为当我再次键入 n 时,命令行提示 PASS 并且进程退出。为什么这个过程在这里结束? work(stopper) 似乎返回nil,然后应该将其传递给结果通道,对吗?但是由于某种原因,这条线似乎没有执行。

我知道主协程,即Run 函数,已经返回了 ErrTimedOut。所以我想这与它有关吗?

【问题讨论】:

  • 这里有很多内容,对于一个问题来说可能有点宽泛,但是对于标题问题“当通道关闭时,以接收通道为参数的 goroutines 是否停止?”,不。当作为 goroutine 调用的函数返回时,Goroutines 停止。没有任何魔法可以让他们在任何其他情况下自动退出;该函数必须编写为在 goroutine 需要停止时返回。
  • @Adrian 谢谢。由于我是一个新手 gopher 并且来自 python 背景,我很难理解通道和多线程编程的概念。您的回答有助于消除一些误解:)

标签: go concurrency goroutine channels


【解决方案1】:

第一个问题

stopper 通道的使用是为了向函数发送信号,例如takesTwentyMillis 到了最后期限,调用者不再关心它的结果。通常这意味着像takesTwentyMillis 这样的工作函数应该检查stopper 通道是否已经关闭,以便它可以取消它的工作。尽管如此,检查stopper 频道是工作函数的选择。它可能会也可能不会检查频道。

func takesTwentyMillis(stopper <-chan struct{}) error {
    for i := 0; i < 20; i++ {
        select {
        case <-stopper:
            // caller doesn't care anymore might as well stop working
            return nil
        case <-time.After(time.Second): // simulating work
        }
    }
    // work is done
    return nil
}

第二个问题

Deadline.Run()这部分会关闭stopper通道。

case <-time.After(d.timeout):
    close(stopper)

在关闭的通道 (&lt;-stopper) 上读取将立即为该通道返回零值。我认为这只是测试最终超时的工作函数。

【讨论】:

  • 哇,谢谢你这么详细的回答。它非常清晰和彻底,对我帮助很大。但是在阅读了您对第二个问题的回答后,我确实还有一个问题,也许您也可以帮我解决这个问题...? (我将为此编辑原始问题...)
  • 我建议您将您的问题非常具体,如下所述:stackoverflow.com/help/how-to-ask。无论如何,为Run() 这里go func() { result &lt;- work(stopper) }() 运行的每个函数创建一个单独的goroutine。即使截止日期发送停止信号,该 goroutine 也会等到工作函数完成。由于截止日期比工作函数的执行时间短,它自然会成为最后完成的 goroutine 之一。
  • 好的。谢谢。我将阅读您提供的链接,并尝试在此处更具体地提出问题。在此期间,你的答案和以前一样清楚。它真的帮助我理解了代码逻辑。谢谢!
猜你喜欢
  • 2019-04-15
  • 2021-10-16
  • 1970-01-01
  • 2017-09-20
  • 2016-03-16
  • 2014-10-10
  • 2018-09-10
  • 2019-02-08
  • 1970-01-01
相关资源
最近更新 更多