【问题标题】:How to sync with pending results in channel?如何与频道中的待处理结果同步?
【发布时间】:2014-07-13 13:45:46
【问题描述】:

我有一个工作池,它提供了一个同步接口来提取结果:

func (p *Pool) Get() *Result {
    for {
        select {
        // if there are results in channel return them
        case r := <-p.results:
            return r
        // else check if there is any work pending we must wait for
        // if not return nil to indicate that all work was done
        default:
            if p.active < 1 {
                return nil
            }
        }
    }
}

这个想法是,如果所有工作都完成,Get 将返回下一个工作人员结果或 nil

现在这个实现的问题是我需要使用p.active 计数器手动跟踪所有活动的工作。这感觉有点不对劲,因为理论上信息已经在p.results 频道的长度内。

如果缓冲区为空,什么不返回任何内容的惯用方法是什么?

【问题讨论】:

  • 您事先知道您将拥有多少工人吗?
  • @OneOfOne 是的,工人是固定的。
  • 问题是当你请求一个结果并且你得到 nil 的那一刻,结果已经过时了。这样的设计可能容易出现竞争条件。
  • 这个@FUZxxl 是什么意思?请进一步解释。
  • 原子计数器确实是最有效的解决方案,其他的都太复杂了。

标签: go locking sync


【解决方案1】:

遗憾的是没有len(chan),如果你不知道工人的数量,你的方法就已经很好了。

但是,您需要对计数器进行某种同步,这里有一个非常简单的方法:

type Result struct {
    I int
}
type Pool struct {
    res chan *Result
    c   int32
}

func New() *Pool {
    return &Pool{
        res: make(chan *Result),
    }
}

func (p *Pool) Put(r *Result) {
    atomic.AddInt32(&p.c, 1)
    time.Sleep(time.Duration(100+r.I%1000) * time.Microsecond)
    p.res <- r
}

func (p *Pool) Get() (r *Result) {
    for {
        select {
        case r = <-p.res:
            atomic.AddInt32(&p.c, -1)
            return
        default:
            if atomic.LoadInt32(&p.c) == 0 {
                return
            }
        }
    }
}
func main() {
    runtime.GOMAXPROCS(8)
    p := New()
    for i := 0; i < 50; i++ {
        go p.Put(&Result{i})
    }
    time.Sleep(10 * time.Microsecond)
    for {
        r := p.Get()
        if r == nil {
            return
        }
        fmt.Println("r.I", r.I)
    }
}

//编辑

为了完整起见,这里是另一个使用 WaitGroup 的示例,但这又是一种矫枉过正,因为 WG 内部无论如何都使用原子计数器。

type Pool struct {
    res chan *Result
    wg  sync.WaitGroup
}

func New(n int) (p *Pool) {
    p = &Pool{
        res: make(chan *Result, n),
    }
    p.wg.Add(n)
    go func() {
        p.wg.Wait()
        close(p.res)
    }()
    return
}

func (p *Pool) Get() *Result {
    for {
        r, ok := <-p.res
        if !ok {
            return nil
        }
        p.wg.Done()
        return r

    }
}

//func Put is the same as above and the test code is the same.

【讨论】:

  • 在 99.9% 的情况下,这种方法是不可能的,但很高兴知道您也可以使用智慧组。
【解决方案2】:

您可以使用WaitGroup 和一个阻止它的 gorouotine,并在完成后更改一些标志。

假设您有一个waitGroup,并且对于您排队的每个项目,您都调用wg.Add(1)。每次你在频道上接收,或者每次工作人员完成时,它都可以调用wg.Done() 来减少它的计数器。

然后你有一个等待evertyhing完成的goroutine,然后设置一个标志:

 go func() {
     wg.Wait()
     p.done = true //do this thread safe of course
 } 

在您的默认情况下,您只需检查完成标志

   default:
        if p.done {
            return nil
        }

有关详细信息,请参阅WaitGroup 上的文档。那里的示例与您的情况有点相似。 http://golang.org/pkg/sync/#WaitGroup.Wait

【讨论】:

  • 我已经尝试使用 WaitGroup 来解决这个问题,不幸的是,有些工作人员比其他工作人员更快,因此他们会一次阻塞。
  • 它不应该影响任何东西。你尝试了什么?
  • 使用 WaitGroup 可以等到一组例程完成。这意味着我需要等到所有工人都完成了不打算做的事情。
  • @bodokaiser 不,你等到计数器达到 0,你可以随心所欲地增加和减少。
  • 是的,计数器何时达到 0?什么时候所有工人都完成了?
猜你喜欢
  • 2020-04-25
  • 2019-04-22
  • 1970-01-01
  • 1970-01-01
  • 2020-10-20
  • 2012-03-14
  • 2017-02-04
  • 2020-05-29
  • 2021-09-15
相关资源
最近更新 更多