更优雅的解决方案是“代理”,客户端可以订阅和取消订阅消息。
为了优雅地处理订阅和取消订阅,我们可以利用通道,因此接收和分发消息的代理的主循环可以使用单个 select 语句合并所有这些,并且从解决方案的自然。
另一个技巧是将订阅者存储在映射中,映射来自我们用来向他们分发消息的通道。所以在地图中使用频道作为键,然后添加和删除客户端是“死”的简单。这之所以成为可能,是因为通道值为comparable,并且它们的比较非常有效,因为通道值是指向通道描述符的简单指针。
事不宜迟,下面是一个简单的代理实现:
type Broker struct {
stopCh chan struct{}
publishCh chan interface{}
subCh chan chan interface{}
unsubCh chan chan interface{}
}
func NewBroker() *Broker {
return &Broker{
stopCh: make(chan struct{}),
publishCh: make(chan interface{}, 1),
subCh: make(chan chan interface{}, 1),
unsubCh: make(chan chan interface{}, 1),
}
}
func (b *Broker) Start() {
subs := map[chan interface{}]struct{}{}
for {
select {
case <-b.stopCh:
return
case msgCh := <-b.subCh:
subs[msgCh] = struct{}{}
case msgCh := <-b.unsubCh:
delete(subs, msgCh)
case msg := <-b.publishCh:
for msgCh := range subs {
// msgCh is buffered, use non-blocking send to protect the broker:
select {
case msgCh <- msg:
default:
}
}
}
}
}
func (b *Broker) Stop() {
close(b.stopCh)
}
func (b *Broker) Subscribe() chan interface{} {
msgCh := make(chan interface{}, 5)
b.subCh <- msgCh
return msgCh
}
func (b *Broker) Unsubscribe(msgCh chan interface{}) {
b.unsubCh <- msgCh
}
func (b *Broker) Publish(msg interface{}) {
b.publishCh <- msg
}
使用示例:
func main() {
// Create and start a broker:
b := NewBroker()
go b.Start()
// Create and subscribe 3 clients:
clientFunc := func(id int) {
msgCh := b.Subscribe()
for {
fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
}
}
for i := 0; i < 3; i++ {
go clientFunc(i)
}
// Start publishing messages:
go func() {
for msgId := 0; ; msgId++ {
b.Publish(fmt.Sprintf("msg#%d", msgId))
time.Sleep(300 * time.Millisecond)
}
}()
time.Sleep(time.Second)
}
上面的输出将是(在Go Playground上试试):
Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3
改进
您可以考虑以下改进。这些可能有用也可能没用,具体取决于您使用代理的方式/用途。
Broker.Unsubscribe() 可能会关闭消息通道,表示不会再在其上发送消息:
func (b *Broker) Unsubscribe(msgCh chan interface{}) {
b.unsubCh <- msgCh
close(msgCh)
}
这将允许客户端通过消息通道range,如下所示:
msgCh := b.Subscribe()
for msg := range msgCh {
fmt.Printf("Client %d got message: %v\n", id, msg)
}
如果有人像这样取消订阅msgCh:
b.Unsubscribe(msgCh)
上述范围循环将在处理完调用Unsubscribe()之前发送的所有消息后终止。
如果您希望您的客户端依赖于正在关闭的消息通道,并且代理的生命周期比您的应用程序的生命周期更窄,那么您也可以在代理停止时关闭所有订阅的客户端,在 Start() 这样的方法中:
case <-b.stopCh:
for msgCh := range subs {
close(msgCh)
}
return