【问题标题】:Correct way to perform a reconnect with gRPC client与 gRPC 客户端重新连接的正确方法
【发布时间】:2021-05-26 22:23:03
【问题描述】:

我有一个 Go gRPC 客户端连接到在我的 k8s 集群的不同 pod 中运行的 gRPC 服务器。

它运行良好,接收和处理请求。

我现在想知道在 gRPC 服务器 pod 被回收的情况下如何最好地实现弹性。

据我所知,clientconn.go 代码应该会自动处理重新连接,但我无法让它工作,而且我担心我的实现一开始就不正确。

从main调用代码:

go func() {     
        if err := gRPCClient.ProcessRequests(); err != nil {
            log.Error("Error while processing Requests")
            //do something here??
        }
    }()

我在 gRPCClient 包装模块中的代码:

func (grpcclient *gRPCClient) ProcessRequests() error {
    defer grpcclient.Close()    

    for {
        request, err := reqclient.stream.Recv()
        log.Info("Request received")
        if err == io.EOF {          
            break
        }
        if err != nil {
            //when pod is recycled, this is what's hit with err:
            //rpc error: code = Unavailable desc = transport is closing"

            //what is the correct pattern for recovery here so that we can await connection
            //and continue processing requests once more?
            //should I return err here and somehow restart the ProcessRequests() go routine in the 
            //main funcition?
            break
            
        } else {
            //the happy path
            //code block to process any requests that are received
        }
    }

    return nil
}

func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
        reqclient.conn.Close()
}

编辑: Emin Laletovic 在下面优雅地回答了我的问题,并在很大程度上得到了它。 我不得不对 waitUntilReady 函数进行一些更改:

func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()

currentState := grpcclient.conn.GetState()
stillConnecting := true

for currentState != connectivity.Ready && stillConnecting {
    //will return true when state has changed from thisState, false if timeout
    stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
    currentState = grpcclient.conn.GetState()
    log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}

if stillConnecting == false {
    log.Error("Connection attempt has timed out.")
    return false
}

return true
}

【问题讨论】:

    标签: go kubernetes network-programming grpc


    【解决方案1】:

    clientconn.go 正在自动处理 RPC 连接,但这并不意味着流也会被自动处理。

    流一旦中断,无论是RPC连接中断还是其他原因,都无法自动重新连接,需要在RPC连接恢复后从服务器获取新的流。

    等待 RPC 连接处于READY 状态并建立新流的伪代码可能如下所示:

    func (grpcclient *gRPCClient) ProcessRequests() error {
        defer grpcclient.Close()    
        
        go grpcclient.process()
        for {
          select {
            case <- grpcclient.reconnect:
               if !grpcclient.waitUntilReady() {
                 return errors.New("failed to establish a connection within the defined timeout")
               }
               go grpcclient.process()
            case <- grpcclient.done:
              return nil
          }
        }
    }
    
    func (grpcclient *gRPCClient) process() {
        reqclient := GetStream() //always get a new stream
        for {
            request, err := reqclient.stream.Recv()
            log.Info("Request received")
            if err == io.EOF {          
                grpcclient.done <- true
                return
            }
            if err != nil {
                grpcclient.reconnect <- true
                return
                
            } else {
                //the happy path
                //code block to process any requests that are received
            }
        }
    }
    
    func (grpcclient *gRPCClient) waitUntilReady() bool {
      ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
      defer cancel()
      return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
    }
    

    【讨论】:

    • 谢谢,这让我很顺利。根据我的测试,waitUntilReady() 中存在问题,首先 WaitForStateChange 从提供的状态返回,因此由于状态为 TransientFailure,它立即返回。为了让它工作,我引入了一个 for 循环来继续尝试,直到状态变为“就绪”查看我的原始帖子以了解我的版本。
    猜你喜欢
    • 2021-09-27
    • 2020-09-05
    • 2017-01-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多