【问题标题】:Interrupt python grpc client when receiving stream接收流时中断python grpc客户端
【发布时间】:2018-01-25 17:19:06
【问题描述】:

我正在玩一点 gRPC,但我不知道在接收流时如何关闭客户端和服务器之间的连接。客户端和服务器都是用 Python 编写的。

例如,我的服务器从队列中读取消息并生成每条消息。我的想法是客户端订阅服务器并开始接收这些消息。

我的问题是:

  • 我想在按下 CTRL+C 时杀死客户端,但它会卡在当前代码中。怎样才能正确完成?
  • 服务器如何意识到客户端已经停止监听?

我的 nbi.proto 文件:

syntax = "proto3";
service Messenger {
    rpc subscribe(Null) return (stream Message) {}
}

message Null {}

message Message {
    string value = 1;
}

Python 客户端:

import test_pb2_grpc as test_grpc
import test_pb2 as test
import grpc

def run():
    channel = grpc.insecure_channel('localhost:50051')
    stub = test_grpc.MessengerStub(channel)

    stream = stub.subscribe(test.Null())
    try:
        for e in stream:
            print e
    except grpc._channel._Rendezvous as err:
        print err
    except KeyboardInterrupt:
        stub.unsuscribe(test.Null)

Python 服务器:

import test_pb2_grpc as test_grpc
import test_pb2 as test
from Queue import Empty
import grpc

class Messenger(test_grpc.MessengerServicer):
    def __init__(self, queue):
        self.queue = queue

    def subscribe(self, request, context):
        while True:
            try:
                yield self.queue.get(block=True, timeout=0.1)
            except Empty:
                continue
            except Exception as e:
                logger.error(e)
                break
        return

【问题讨论】:

    标签: python queue python-multithreading grpc


    【解决方案1】:

    我想在按下 CTRL+C 时杀死客户端,但是 它被当前代码卡住了。怎样才能正确完成?

    KeyboardInterrupt 应该足以终止客户端应用程序。可能该进程挂在stub.unsubscribe 上。如果使用客户端断线回调,或许不需要显式取消订阅。

    服务器如何意识到客户端已经停止监听?

    您可以将 add a callback to the context object 传递给您的 Messenger.subscribe 方法。在客户端断开连接时调用回调。

    顺便说一句,您可以使用 empty.proto 代替您的 Null 类型。

    【讨论】:

    • 非常感谢!我已经按照您的建议添加了回调,服务器现在可以正常工作。但是,我仍然无法终止客户端。
    • 嗯,我刚刚创建了最基本的一元流客户端,就像您在这里为我的服务一样。 gist.github.com/markns/… 彻底关闭。你的代码中一定有另一个线程阻塞
    • 好吧,我认为只有在使用 Python 2.7 时才会出现问题,我会尝试使用 Python 3... 无论如何,谢谢。
    • 什么是stub.unsubscribe(test.Null)?这看起来会引发AttributeError?如果您想取消飞行中的 RPC,我认为 stream.cancel() 将是那个地方的正确代码?或者,为什么不直接删除整个 except KeyboardInterrupt: 语法元素?当run函数返回时,当stream超出范围时,应该是RPC被取消的情况。
    猜你喜欢
    • 2020-07-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-15
    • 1970-01-01
    • 2019-09-20
    相关资源
    最近更新 更多