【问题标题】:How to create server for persistent stream (aka pubsub) in Golang GRPC如何在 Golang GRPC 中为持久流(又名 pubsub)创建服务器
【发布时间】:2019-10-29 20:09:19
【问题描述】:

我正在构建需要以 Pub/Sub 方式向所有订阅的消费者发送事件的服务,例如。向所有当前连接的客户端发送一个事件。

我为此使用 Protobuf,并带有以下 proto 定义:

service EventsService {
  rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}

服务器和客户端都是用 Go 编写的。

我的问题是,当客户端启动连接时,流它不是长期存在的,例如。当服务器从ListenForEvents 方法返回时:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
    //persist listener here so it can be used later when backend needs to send some messages to client

    return nil
}

然后客户端几乎立即收到EOF 错误,这意味着服务器可能关闭了连接。

我该怎么做才能让客户端长时间订阅服务器? 主要问题是当客户端调用服务器上的ListenForEvents 方法时我可能没有任何东西可以发送给客户端,这就是为什么我希望这个流长期存在 以后可以发送消息。

【问题讨论】:

  • 您可能想要使用真正的消息队列,例如NATSsubjects
  • 感谢@MarkusWMahlberg。我同意使用更复杂的东西会更好,但我仍然想知道 GRPC 的做和不做。

标签: go streaming grpc


【解决方案1】:

当您从服务器函数返回时,流将终止。相反,您应该以某种方式接收事件,并将它们发送到客户端而不从服务器返回。可能有很多方法可以做到这一点。下面是一种方法的草图。

这依赖于在单独的 goroutine 上运行的服务器连接。有一个 Broadcast() 函数将向所有连接的客户端发送消息。它看起来像这样:

var allRegisteredClients map[*pb.AgentProcess]chan Message
var clientsLock sync.RWMutex{}

func Broadcast(msg Message) {
  clientsLock.RLock()
  for _,x:=range allRegisteredClients {
      x<-msg
  }
  clientsLock.RUnlock()
}

然后,您的客户必须自己注册并处理消息:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
   clientsLock.Lock()
   ch:=make(chan Message)
   allRegisteredClients[process]=ch
   clientsLock.Unlock()

   for msg:=range ch {
       // send message
       // Deal with errors
       // Deal with client terminations
   }
   clientsLock.Lock()
   delete(allRegisteredClients,process)
   clientsLock.Unlock()
}

正如我所说,这只是这个想法的草图。

【讨论】:

  • 谢谢,我确实修改了一下 :) 查看我的回复。
  • 你的很活泼。此外,为此,您需要为 EventsService 传递一个指针接收器。
【解决方案2】:

您需要为grpc客户端和服务器提供keepalive设置

【讨论】:

    【解决方案3】:

    我已经确定了。

    基本上我永远不会从方法ListenForEvents 返回。 它创建频道,保留在订阅客户端的类似全局地图中,并无限期地从该频道读取。

    服务器逻辑的整体实现:

    func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
        chans, exists := e.listeners[process.Hostname]
    
        chanForThisClient := make(chan *pb.Event)
    
        if !exists {
            e.listeners[process.Hostname] = []chan *pb.Event{chanForThisClient}
        } else {
            e.listeners[process.Hostname] = append(chans, chanForThisClient)
        }
    
        for {
            select {
            case <-listener.Context().Done():
                return nil
            case res := <-chanForThisClient:
                _ = listener.Send(res)
            }
        }
    
        return nil
    }
    

    【讨论】:

    • 这成功了吗?这表现如何?连接没有中断-因为连接可以等待很长时间
    • 我需要反思这段代码,看看是否不需要锁。无论如何,它帮助我构建了我的代码。在我的解决方案中,我在客户端使用更长的期限和重新连接,以避免其他无限期调用的问题。
    猜你喜欢
    • 2017-05-28
    • 1970-01-01
    • 1970-01-01
    • 2010-10-24
    • 2022-08-19
    • 1970-01-01
    • 1970-01-01
    • 2022-11-01
    • 2021-06-30
    相关资源
    最近更新 更多