【问题标题】:Simple queue model example简单的队列模型示例
【发布时间】:2018-08-24 01:45:20
【问题描述】:

有没有一个简单的程序来演示 Go 中的队列是如何工作的。 我只需要在队列中添加数字 1 到 10 并使用另一个线程并行地从队列中拉出这些。

【问题讨论】:

  • 所以 go 中的队列通常是频道,这里有一个完整的教程:tour.golang.org/concurrency/2
  • 该语言称为“Go”。 “golang”是流行的互联网搜索引擎识别的用于执行与该语言相关的查询的令牌。

标签: multithreading go queue


【解决方案1】:

可安全并发使用的队列基本上是一种语言结构:channel

按照设计,通道对于并发 sendreceive 是安全的。此处有详细说明:If I am using channels properly should I need to use mutexes? 在其上发送的值按发送顺序接收。

您可以在此处阅读有关频道的更多信息:What are golang channels used for?

一个非常简单的例子:

c := make(chan int, 10) // buffer for 10 elements

// Producer: send elements in a new goroutine
go func() {
    for i := 0; i < 10; i++ {
        c <- i
    }
    close(c)
}()

// Consumer: receive all elements sent on it before it was closed:
for v := range c {
    fmt.Println("Received:", v)
}

输出(在Go Playground 上试试):

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9

请注意,通道缓冲区(本例中为 10 个)与您要“通过”它发送的元素数量无关。缓冲区告诉通道可以“存储”多少元素,或者换句话说,当没有人从它接收时,您可以在不阻塞的情况下发送多少元素。当通道的缓冲区已满时,进一步的发送将被阻塞,直到有人开始从中接收值。

【讨论】:

    【解决方案2】:

    您可以使用通道(并发使用安全)和等待组同时从队列中读取

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        queue := make(chan int)
    
        wg := new(sync.WaitGroup)
        wg.Add(1)
        defer wg.Wait()
    
        go func(wg *sync.WaitGroup) {
            for {
    
                r, ok := <-queue
                if !ok {
                    wg.Done()
                    return
                }
    
                fmt.Println(r)
            }
        }(wg)
    
        for i := 1; i <= 10; i++ {
            queue <- i
        }
    
        close(queue)
    }
    

    游乐场链接:https://play.golang.org/p/A_Amqcf2gwU

    【讨论】:

      【解决方案3】:

      另一种选择是创建和实现队列interface,其支持类型为并发通道。为方便起见,我创建了一个gist

      这里是你如何使用它。

      queue := GetIntConcurrentQueue()
      defer queue.Close()
      
      // queue.Enqueue(1)
      // myInt, errQueueClosed := queue.DequeueBlocking()
      // myInt, errIfNoInt := queue.DequeueNonBlocking()
      

      这里有更长的例子 - https://play.golang.org/p/npb2Uj9hGn1

      下面是完整的实现,这里又是它的gist

      // Can be any backing type, even 'interface{}' if desired.
      // See stackoverflow.com/q/11403050/3960399 for type conversion instructions.
      type IntConcurrentQueue interface {
          // Inserts the int into the queue
          Enqueue(int)
          // Will return error if there is nothing in the queue or if Close() was already called
          DequeueNonBlocking() (int, error)
          // Will block until there is a value in the queue to return.
          // Will error if Close() was already called.
          DequeueBlocking() (int, error)
          // Close should be called with defer after initializing
          Close()
      }
      
      func GetIntConcurrentQueue() IntConcurrentQueue {
          return &intChannelQueue{c: make(chan int)}
      }
      
      type intChannelQueue struct {
          c chan int
      }
      
      func (q *intChannelQueue) Enqueue(i int) {
          q.c <- i
      }
      
      func (q *intChannelQueue) DequeueNonBlocking() (int, error) {
          select {
          case i, ok := <-q.c:
              if ok {
                  return i, nil
              } else {
                  return 0, fmt.Errorf("queue was closed")
              }
          default:
              return 0, fmt.Errorf("queue has no value")
          }
      }
      
      func (q *intChannelQueue) DequeueBlocking() (int, error) {
          i, ok := <-q.c
          if ok {
              return i, nil
          }
          return 0, fmt.Errorf("queue was closed")
      }
      
      func (q *intChannelQueue) Close() {
          close(q.c)
      }
      

      【讨论】:

        猜你喜欢
        • 2011-06-03
        • 2018-09-18
        • 1970-01-01
        • 2013-06-29
        • 2014-01-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多