【问题标题】:Failure to implement a Client Streaming to Server Python code未能实现客户端流式传输到服务器 Python 代码
【发布时间】:2021-12-12 14:06:33
【问题描述】:

我刚开始使用 gRPC,这让我有些困惑。 我正在尝试制作一个流向服务器的客户端和一个流向客户端的服务器。 我成功实现了服务器流式传输到客户端,但未能将客户端流式传输到服务器。

这是我正在使用的 .proto 文件:

syntax = "proto3";

package streaming;

service Streaming{
  rpc ServerStreaming (Message) returns (stream Message) {}
  rpc ClientStreaming (stream Message) returns (Message) {}
}

message Message{
  string message = 1;
}

message MessageResponse{
  string message = 1;
  bool received = 2;
}

这是我的服务器代码:

from concurrent import futures

import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2


def make_message(message):
    return streaming_pb2.Message(
        message=message
    )


class StreamingService(streaming_pb2_grpc.StreamingServicer):

    def ServerStreaming(self, request_iterator, context):
        message_full = ''
        for message in request_iterator:
            message_full += message
        result = f'Hello I am up and running received "{message_full}" message from you'
        result = {'message': result, 'received': True}
        return streaming_pb2.MessageResponse(**result)


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    streaming_pb2_grpc.add_StreamingServicer_to_server(StreamingService(), server)
    server.add_insecure_port('[::]:8091')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    serve()

这是客户:


import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2


def make_message(message):
    return streaming_pb2.Message(
        message=message
    )


def generate_messages():
    messages = [
        make_message("First message"),
        make_message("Second message"),
        make_message("Third message"),
        make_message("Fourth message"),
        make_message("Fifth message"),
    ]
    for msg in messages:
        print("Hello Server Sending you the %s" % msg.message)
        yield msg


def send_message(stub):
    responses = stub.ServerStreaming(generate_messages())
    for response in responses:
        print("Hello from the server received your %s" % response.message)


def run():
    with grpc.insecure_channel('localhost:8091') as channel:
        stub = streaming_pb2_grpc.StreamingStub(channel)
        send_message(stub)


if __name__ == '__main__':
    run()

这是我在运行客户端时遇到的错误:

/Users/<>/bin/python3 /Users/<>/client_streaming_client.py
Traceback (most recent call last):
  File "/Users/<>/client_streaming_client.py", line 40, in <module>
    run()
  File "/Users/<>/client_streaming_client.py", line 36, in run
    send_message(stub)
  File "/Users/<>/client_streaming_client.py", line 28, in send_message
    responses = stub.ServerStreaming(generate_messages())
  File "/Users/<>/lib/python3.7/site-packages/grpc/_channel.py", line 1057, in __call__
    raise rendezvous  # pylint: disable-msg=raising-bad-type
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INTERNAL
    details = "Exception serializing request!"
    debug_error_string = "None"
>

Process finished with exit code 1

知道我在这里做错了什么吗?我一头雾水。。

【问题讨论】:

  • 您正在传递生成器generate_messages,如果您的原型指定了bidirectional streaming RPC,这将是正确的。我认为你的原型应该是:rpc ClientStreaming(stream Message) returns (stream MessageResponse) {}
  • 没有得到:``` grpc._channel._InactiveRpcError: <_inactiverpcerror of rpc statuscode.internal details="异常序列化请求!" debug_error_string="无"> ```

标签: python grpc grpc-python


【解决方案1】:

从您的帖子中并不能完全清楚您预期会发生什么。您已经定义了两个 RPC ServerStreaming(一个 response-streaming RPC)和 ClientStreaming(一个 request-streaming RPC)。您的两个原型都没有使用 MessageResponse 类型(但您的服务器会尝试返回它)。

根据您编写的客户端和服务器代码,我认为您期待bidirectionally-streaming RPC,定义如下:

streaming.proto

syntax = "proto3";

package streaming;

service Streaming {
  rpc ClientStreaming(stream Message) returns (stream MessageResponse) {}
}

message Message{
  string message = 1;
}

message MessageResponse{
  string message = 1;
  bool received = 2;
}

然后您可以迭代请求并从服务器产生响应:

from concurrent import futures
import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2

class StreamingService(streaming_pb2_grpc.StreamingServicer):
    def ClientStreaming(self, request_iterator, context):
        s = ""
        for message in request_iterator:
            print(f"Received: {message.message!r}")
            s = ", ".join([s, message.message]) if s else message.message
            yield streaming_pb2.MessageResponse(message=s, received=True)

if __name__ == "__main__":
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    streaming_pb2_grpc.add_StreamingServicer_to_server(StreamingService(), server)
    server.add_insecure_port("[::]:8091")
    server.start()
    server.wait_for_termination()

您可以使用可迭代和可迭代的响应与客户端发送消息:

import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2


def generate_messages():
    messages = [
        "First message",
        "Second message",
        "Third message",
        "Fourth message",
        "Fifth message",
    ]
    for msg in messages:
        msg = streaming_pb2.Message(message=msg)
        print(f"Sending {msg.message!r}")
        yield msg


def send_message(stub):
    responses = stub.ClientStreaming(generate_messages())
    for response in responses:
        print(f"Received {response.message!r}")


if __name__ == "__main__":
    with grpc.insecure_channel("localhost:8091") as channel:
        stub = streaming_pb2_grpc.StreamingStub(channel)
        send_message(stub)

【讨论】:

    猜你喜欢
    • 2021-05-14
    • 1970-01-01
    • 2014-01-19
    • 2018-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-16
    相关资源
    最近更新 更多