【问题标题】:How to create gRPC streaming interceptors using C#?如何使用 C# 创建 gRPC 流式拦截器?
【发布时间】:2020-06-04 14:35:19
【问题描述】:

我已经知道如何创建一元 RPC 拦截器,但我不知道如何创建流式 RPC 拦截器。 这是我迄今为止所拥有的:

public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
        IAsyncStreamReader<TRequest> requestStream, ServerCallContext context,
        ClientStreamingServerMethod<TRequest, TResponse> continuation)
    {
        Console.WriteLine("ClientStreaming");
        var response = await base.ClientStreamingServerHandler(requestStream, context, continuation);
        return response;
    }

每次启动客户端流时,此代码都会截断控制台日志,我只是不知道如何控制台记录每条传入的客户端消息。

亲切的问候杰西

【问题讨论】:

  • 我已经为另一个方向做了这个,服务器流与客户端。我现在不在开发机器附近,但如果有帮助可以发布
  • @RobGoodwin 在这一点上任何事情都会有所帮助,所以请 :)。

标签: c# .net grpc interceptor grpc-dotnet


【解决方案1】:

我最近遇到了类似的情况,以下是我使用拦截器解决它的方法。我们需要通过我们的 gRPC API 测量消息吞吐量/消息大小。一元调用非常简单,但遇到了关于流媒体的问题。以下是我为拦截流所做的工作(在我的情况下,服务器应该与您的场景类似)。

第一段代码是你已经拥有的拦截器方法(这个是服务器流)

    public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, 
                                                                                                    ClientInterceptorContext<TRequest, TResponse> context, 
                                                                                                    AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
  {
     // Add the outgoing message size to the metrics
     mCollector.Add(mInterceptorName,context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.OUT, request as IMessage);

     // This call returns the server stream, among other things and the server stream reader
     // is returned for the client to consume.
     var prelimResponse = base.AsyncServerStreamingCall(request, context, continuation);

     // Add the result message size to the metrics
     mCollector.Add(mInterceptorName, context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.IN, prelimResponse as IMessage);
     // Wrap the response stream object with our implementation that will log the size and then
     // proxy that to the client.

     var response = new AsyncServerStreamingCall<TResponse>(new AsyncStreamReaderWrapper<TResponse>(prelimResponse.ResponseStream,
                                                                                                    mInterceptorName,
                                                                                                    context.Method.ServiceName, 
                                                                                                    context.Method.Name,mCollector), 
                                                                                                    prelimResponse.ResponseHeadersAsync, 
                                                                                                    prelimResponse.GetStatus, 
                                                                                                    prelimResponse.GetTrailers, 
                                                                                                    prelimResponse.Dispose);
     // return the wrapped stream to the client
     return response;
  }

AsyncServerStreamReaderWrapper 实现接收对象、测量、记录其大小,然后将其传递给客户端。这个包装器是必需的,因为 Stream 阅读器只能有一个消费者,如果我有多个阅读器会产生错误,这是有道理的。

   /// <summary>
   /// Wrapper class around the gRPC AsyncStreamReader class that allows retrieval of the object 
   /// before handing off to the client for the purpose of throughput measurements and metrics
   /// collection
   /// </summary>
   /// <typeparam name="T">type of object contained within the stream</typeparam>
   public class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
   {
      private IAsyncStreamReader<T> mInnerImplementation = null;
      private NetworkMetricsCollectionService mCollector = null;
      private string mId = string.Empty;
      private string mService = string.Empty;
      private string mMethod = string.Empty;

      public T Current => mInnerImplementation.Current;

      /// <summary>
      /// Advances the reader to the next element in the sequence, returning the result asynchronously.
      /// </summary>
      /// <param name="cancellationToken">Cancellation token that can be used to cancel the 
      /// operation.</param>
      /// <returns>Task containing the result of the operation: true if the reader was successfully
      /// advanced to the next element; false if the reader has passed the end of the sequence.</returns>
      public async Task<bool> MoveNext(CancellationToken cancellationToken)
      {
         bool result = await mInnerImplementation.MoveNext(cancellationToken);
         if (result)
         {
            mCollector.Add(mId,mService, mMethod, NetworkMetricsCollectionService.DIRECTION.IN, Current as IMessage);
         }
         return result;
      }

      /// <summary>
      /// Parameterized Constructor
      /// </summary>
      /// <param name="aInnerStream">inner stream reader to wrap</param>
      /// <param name="aService">service name for metrics reporting</param>
      /// <param name="aMethod">method name for metrics reporting</param>
      /// <param name="aCollector">metrics collector</param>
      public AsyncStreamReaderWrapper(IAsyncStreamReader<T> aInnerStream, string aId, string aService, string aMethod, NetworkMetricsCollectionService aCollector)
      {
         mInnerImplementation = aInnerStream;
         mId = aId;
         mService = aService;
         mMethod = aMethod;
         mCollector = aCollector;
      }
   }

我知道这不是您正在寻找的确切场景,但我相信您的实现将是相似的,但使用 StreamWriter 而不是 StreamReader 并且没有用于尝试测量消息大小的位。

【讨论】:

    【解决方案2】:

    您还可以查看 GRPC 示例 - https://github.com/grpc/grpc-dotnet/tree/master/examples

    Github中有很多gRPC场景

    【讨论】:

    • 虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接答案可能会失效。 - From Review
    猜你喜欢
    • 1970-01-01
    • 2021-05-11
    • 2022-07-12
    • 2010-10-13
    • 1970-01-01
    • 2023-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多