【问题标题】:How to broadcast message using channel如何使用频道广播消息
【发布时间】:2016-07-24 20:28:43
【问题描述】:

我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。

在我的服务器中,我有一个接受连接的 goroutine(无限循环),并且所有连接都由通道接收。

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代通道来广播到所有连接。

for c := range ch {
    conn.Write(msg)
}

但是,我无法广播,因为(我认为通过阅读文档)频道需要在迭代之前关闭。我不确定何时应该关闭频道,因为我想不断接受新的连接,而关闭频道不会让我这样做。如果有人可以帮助我,或者提供一种更好的方式向所有连接的客户端广播消息,将不胜感激。

【问题讨论】:

  • 为什么你认为“广播”需要一个封闭的渠道? for c:= range ch 循环在 ch 关闭之前不会终止,仅此而已。

标签: go concurrency channel goroutine


【解决方案1】:

你正在做的是一个扇出模式,也就是说,多个端点正在监听一个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是频道的close。此close 将被所有听众识别,因此是“广播”。

但是你想做的是广播一条从连接中读取的消息,所以我们可以这样做:

当听众人数已知时

让每个worker监听专用的广播频道,并将消息从主频道分发到每个专用的广播频道。

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

然后我们可以有一堆工人:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后启动我们的监听器:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

还有一个调度员:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

当听众人数未知时

在这种情况下,上面给出的解决方案仍然有效。唯一不同的是,当你需要一个新的worker时,你需要创建一个新的worker,启动它,然后将它推入workers slice。但是这种方法需要一个线程安全的切片,它需要一个锁。其中一种实现可能如下所示:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

无论何时你想启动一个工人:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

您的调度员将更改为:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

遗言:永远不要离开一个悬空的 goroutine

其中一个好的做法是:永远不要离开悬空的 goroutine。所以当你听完之后,你需要关闭所有你触发的 goroutine。这将通过worker 中的quit 频道完成:

首先我们需要创建一个全局的quit 信令通道:

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们都会将globalQuit通道分配给它作为它的退出信号:

worker.quit = globalQuit

然后,当我们想要关闭所有工作人员时,我们只需这样做:

close(globalQuit)

由于close会被所有监听的goroutine识别(这是你理解的一点),所有的goroutine都会被返回。记得关闭你的调度程序例程,但我会把它留给你:)

【讨论】:

  • 谢谢,但是如果您事先不知道端点(工作人员)的数量怎么办?您将如何遍历每个 goroutine 中的工作人员?
  • 最简单的方法是启动一个worker,然后将其推入workers slice,然后在调度器中的下一次for循环运行时广播。切记在进行切片推送时使用锁 :)
  • 我从“当不知道听众数量时”示例中提出了一个要点,因为代码不完整并且有错误。这是一个简单的服务器,它广播从一个 goroutine 生成的整数,然后使用广播向每个连接发送数据:gist.github.com/speps/ce645a5ca2d2cb9a81e52c7311f38677
  • 这段代码很活泼。想想当 select 语句中的每个 case 都准备好时会发生什么。
  • 为什么我们需要频道ch?为什么调度器必须在一个新的 goroutine 上运行?我们可以重写调度程序以接受msg 作为参数,通过每个w.source 通道发送。
【解决方案2】:

由于 Go 通道遵循通信顺序过程 (CSP) 模式,因此通道是点对点通信实体。每次交流总是有一位作者和一位读者参与。

然而,每个频道 end 可以在多个 goroutine 之间共享。这样做是安全的 - 没有危险的竞争条件。

因此可以有多个作家共享写作端。和/或可以有多个阅读器共享阅读端。我在different answer 中写了更多关于此的内容,其中包括示例。

如果你真的需要广播,你不能直接这样做,但不难实现一个中间 goroutine,将一个值复制到一组输出通道中的每一个。

【讨论】:

    【解决方案3】:

    广播到频道片段并使用 sync.Mutex 管理频道添加和删除可能是您情况下最简单的方法。

    您可以在 golang 中对broadcast 执行以下操作:

    • 您可以使用 sync.Cond 广播共享状态更改。这种方式没有任何 alloc once 设置,但您不能添加超时功能或使用其他通道。
    • 您可以使用关闭的旧频道广播共享状态更改,并创建新频道和 sync.Mutex。这种方式每次状态更改都有一个分配器,但您可以添加超时功能并使用另一个通道。
    • 您可以广播到一个函数回调切片并使用sync.Mutex 来管理它们。调用者可以做频道的事情。这种方式每个调用者有多个分配器,并与另一个通道一起使用。
    • 您可以广播到一个频道片段并使用sync.Mutex 来管理它们。这种方式每个调用者有多个分配器,并与另一个通道一起使用。
    • 您可以广播到一个 sync.WaitGroup 并使用 sync.Mutex 来管理它们。

    【讨论】:

      【解决方案4】:

      更优雅的解决方案是“代理”,客户端可以订阅和取消订阅消息。

      为了优雅地处理订阅和取消订阅,我们可以利用通道,因此接收和分发消息的代理的主循环可以使用单个 select 语句合并所有这些,并且从解决方案的自然。

      另一个技巧是将订阅者存储在映射中,映射来自我们用来向他们分发消息的通道。所以在地图中使用频道作为键,然后添加和删除客户端是“死”的简单。这之所以成为可能,是因为通道值为comparable,并且它们的比较非常有效,因为通道值是指向通道描述符的简单指针。

      事不宜迟,下面是一个简单的代理实现:

      type Broker struct {
          stopCh    chan struct{}
          publishCh chan interface{}
          subCh     chan chan interface{}
          unsubCh   chan chan interface{}
      }
      
      func NewBroker() *Broker {
          return &Broker{
              stopCh:    make(chan struct{}),
              publishCh: make(chan interface{}, 1),
              subCh:     make(chan chan interface{}, 1),
              unsubCh:   make(chan chan interface{}, 1),
          }
      }
      
      func (b *Broker) Start() {
          subs := map[chan interface{}]struct{}{}
          for {
              select {
              case <-b.stopCh:
                  return
              case msgCh := <-b.subCh:
                  subs[msgCh] = struct{}{}
              case msgCh := <-b.unsubCh:
                  delete(subs, msgCh)
              case msg := <-b.publishCh:
                  for msgCh := range subs {
                      // msgCh is buffered, use non-blocking send to protect the broker:
                      select {
                      case msgCh <- msg:
                      default:
                      }
                  }
              }
          }
      }
      
      func (b *Broker) Stop() {
          close(b.stopCh)
      }
      
      func (b *Broker) Subscribe() chan interface{} {
          msgCh := make(chan interface{}, 5)
          b.subCh <- msgCh
          return msgCh
      }
      
      func (b *Broker) Unsubscribe(msgCh chan interface{}) {
          b.unsubCh <- msgCh
      }
      
      func (b *Broker) Publish(msg interface{}) {
          b.publishCh <- msg
      }
      

      使用示例:

      func main() {
          // Create and start a broker:
          b := NewBroker()
          go b.Start()
      
          // Create and subscribe 3 clients:
          clientFunc := func(id int) {
              msgCh := b.Subscribe()
              for {
                  fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
              }
          }
          for i := 0; i < 3; i++ {
              go clientFunc(i)
          }
      
          // Start publishing messages:
          go func() {
              for msgId := 0; ; msgId++ {
                  b.Publish(fmt.Sprintf("msg#%d", msgId))
                  time.Sleep(300 * time.Millisecond)
              }
          }()
      
          time.Sleep(time.Second)
      }
      

      上面的输出将是(在Go Playground上试试):

      Client 2 got message: msg#0
      Client 0 got message: msg#0
      Client 1 got message: msg#0
      Client 2 got message: msg#1
      Client 0 got message: msg#1
      Client 1 got message: msg#1
      Client 1 got message: msg#2
      Client 2 got message: msg#2
      Client 0 got message: msg#2
      Client 2 got message: msg#3
      Client 0 got message: msg#3
      Client 1 got message: msg#3
      

      改进

      您可以考虑以下改进。这些可能有用也可能没用,具体取决于您使用代理的方式/用途。

      Broker.Unsubscribe() 可能会关闭消息通道,表示不会再在其上发送消息:

      func (b *Broker) Unsubscribe(msgCh chan interface{}) {
          b.unsubCh <- msgCh
          close(msgCh)
      }
      

      这将允许客户端通过消息通道range,如下所示:

      msgCh := b.Subscribe()
      for msg := range msgCh {
          fmt.Printf("Client %d got message: %v\n", id, msg)
      }
      

      如果有人像这样取消订阅msgCh

      b.Unsubscribe(msgCh)
      

      上述范围循环将在处理完调用Unsubscribe()之前发送的所有消息后终止。

      如果您希望您的客户端依赖于正在关闭的消息通道,并且代理的生命周期比您的应用程序的生命周期更窄,那么您也可以在代理停止时关闭所有订阅的客户端,在 Start() 这样的方法中:

      case <-b.stopCh:
          for msgCh := range subs {
              close(msgCh)
          }
          return
      

      【讨论】:

      • 我认为 map[chan]strinct{} 的使用在这里非常有趣。我以前从未见过这样使用的地图。它使获取和删除变得更加容易。
      【解决方案5】:

      这是一个较晚的答案,但我认为它可能会安抚一些好奇的读者。

      Go 通道在并发方面受到广泛欢迎。

      Go 社区严格遵循这句话:

      不要通过共享内存进行通信;相反,通过通信共享内存。

      我对此完全中立,我认为在广播方面应该考虑其他选项,而不是明确定义的channels

      这是我的看法:来自同步包的 Cond 是 widely overlookedBronze man 建议在相同的上下文中实施 braodcaster 值得注意。

      我很高兴女巫 icza 建议使用频道并通过它们广播消息。我遵循相同的方法并使用同步的条件变量:

      // Broadcaster is the struct which encompasses broadcasting
      type Broadcaster struct {
          cond        *sync.Cond
          subscribers map[interface{}]func(interface{})
          message     interface{}
          running     bool
      }
      

      这是我们整个广播概念所依赖的主要结构。

      下面,我为这个结构定义了一些行为。简而言之,订阅者应该可以添加、删除,并且整个过程应该是可撤销的。

          // SetupBroadcaster gives the broadcaster object to be used further in messaging
          func SetupBroadcaster() *Broadcaster {
          
              return &Broadcaster{
                  cond:        sync.NewCond(&sync.RWMutex{}),
                  subscribers: map[interface{}]func(interface{}){},
              }
          }
          
          // Subscribe let others enroll in broadcast event!
          func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
          
              b.subscribers[id] = f
          }
          
          // Unsubscribe stop receiving broadcasting
          func (b *Broadcaster) Unsubscribe(id interface{}) {
              b.cond.L.Lock()
              delete(b.subscribers, id)
              b.cond.L.Unlock()
          }
          
          // Publish publishes the message
          func (b *Broadcaster) Publish(message interface{}) {
              go func() {
                  b.cond.L.Lock()
          
                  b.message = message
                  b.cond.Broadcast()
                  b.cond.L.Unlock()
              }()
          }
          
          // Start the main broadcasting event
          func (b *Broadcaster) Start() {
              b.running = true
              for b.running {
                  b.cond.L.Lock()
                  b.cond.Wait()
                  go func() {
                      for _, f := range b.subscribers {
                          f(b.message) // publishes the message
                      }
                  }()
                  b.cond.L.Unlock()
              }
          
          }
          
          // Stop broadcasting event
          func (b *Broadcaster) Stop() {
              b.running = false
          }
      

      接下来,我可以很轻松地使用它了:

          messageToaster := func(message interface{}) {
              fmt.Printf("[New Message]: %v\n", message)
          }
          unwillingReceiver := func(message interface{}) {
              fmt.Println("Do not disturb!")
          }
          broadcaster := SetupBroadcaster()
          broadcaster.Subscribe(1, messageToaster)
          broadcaster.Subscribe(2, messageToaster)
          broadcaster.Subscribe(3, unwillingReceiver)
      
          go broadcaster.Start()
      
          broadcaster.Publish("Hello!")
      
          time.Sleep(time.Second)
          broadcaster.Unsubscribe(3)
          broadcaster.Publish("Goodbye!")
      
      

      它应该以任何顺序打印这样的东西:

      [New Message]: Hello!
      Do not disturb!
      [New Message]: Hello!
      [New Message]: Goodbye!
      [New Message]: Goodbye!
      

      go playground看到这个

      【讨论】:

        【解决方案6】:

        另一个简单的例子: https://play.golang.org

            
        type Broadcaster struct {
            mu      sync.Mutex
            clients map[int64]chan struct{}
        }
        
        func NewBroadcaster() *Broadcaster {
            return &Broadcaster{
                clients: make(map[int64]chan struct{}),
            }
        }
        
        func (b *Broadcaster) Subscribe(id int64) (<-chan struct{}, error) {
            defer b.mu.Unlock()
            b.mu.Lock()
            s := make(chan struct{}, 1)
        
            if _, ok := b.clients[id]; ok {
                return nil, fmt.Errorf("signal %d already exist", id)
            }
        
            b.clients[id] = s
        
            return b.clients[id], nil
        }
        
        func (b *Broadcaster) Unsubscribe(id int64) {
            defer b.mu.Unlock()
            b.mu.Lock()
            if _, ok := b.clients[id]; ok {
                close(b.clients[id])
            }
        
            delete(b.clients, id)
        }
        
        func (b *Broadcaster) broadcast() {
            defer b.mu.Unlock()
            b.mu.Lock()
            for k := range b.clients {
                if len(b.clients[k]) == 0 {
                    b.clients[k] <- struct{}{}
                }
            }
        }
        
        type testClient struct {
            name     string
            signal   <-chan struct{}
            signalID int64
            brd      *Broadcaster
        }
        
        func (c *testClient) doWork() {
            i := 0
            for range c.signal {
                fmt.Println(c.name, "do work", i)
                if i > 2 {
                    c.brd.Unsubscribe(c.signalID)
                    fmt.Println(c.name, "unsubscribed")
                }
                i++
            }
            fmt.Println(c.name, "done")
        }
        
        func main() {
            var err error
            brd := NewBroadcaster()
        
            clients := make([]*testClient, 0)
        
            for i := 0; i < 3; i++ {
                c := &testClient{
                    name:     fmt.Sprint("client:", i),
                    signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
                    brd:      brd,
                }
                c.signal, err = brd.Subscribe(c.signalID)
                if err != nil {
                    log.Fatal(err)
                }
        
                clients = append(clients, c)
            }
        
            for i := 0; i < len(clients); i++ {
                go clients[i].doWork()
            }
        
            for i := 0; i < 6; i++ {
                brd.broadcast()
                time.Sleep(time.Second)
            }
        }
        

        输出:

        client:0 do work 0
        client:2 do work 0
        client:1 do work 0
        client:2 do work 1
        client:0 do work 1
        client:1 do work 1
        client:2 do work 2
        client:0 do work 2
        client:1 do work 2
        client:2 do work 3
        client:2 unsubscribed
        client:2 done
        client:0 do work 3
        client:0 unsubscribed
        client:0 done
        client:1 do work 3
        client:1 unsubscribed
        client:1 done
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-04-16
          • 1970-01-01
          • 1970-01-01
          • 2015-08-02
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多