【问题标题】:Observer pattern using gRPC - C#使用 gRPC 的观察者模式 - C#
【发布时间】:2020-05-04 09:33:13
【问题描述】:

对不起,如果这是一个愚蠢的问题,但我在互联网上没有找到任何有用的信息。

有没有人尝试过在 C# 中使用 gRPC 作为通信来实现观察者模式? 如果是,请给我链接。

非常感谢提前和最好的问候。

【问题讨论】:

    标签: c# grpc observer-pattern


    【解决方案1】:

    我已经实现了一个客户端便利类包装器,以将服务器流式调用转换为我正在工作的项目的常规事件。不确定这是否是您所追求的。这是一个简单的 gRPC 服务器,它每秒将时间作为字符串发布一次。

    syntax = "proto3";
    package SimpleTime;
    
    service SimpleTimeService
    {
       rpc MonitorTime(EmptyRequest) returns (stream TimeResponse);
    }
    
    message EmptyRequest{}
    
    message TimeResponse
    {
       string time = 1;
    }
    

    服务器实现,它每秒循环一次,返回当前时间的字符串表示,直到取消,如下所示

    public override async Task MonitorTime(EmptyRequest request, IServerStreamWriter<TimeResponse> responseStream, ServerCallContext context)
    {
       try
       {
          while (!context.CancellationToken.IsCancellationRequested)
          {
             var response = new TimeResponse
             {
                Time = DateTime.Now.ToString()
             };
             await responseStream.WriteAsync(response);
             await Task.Delay(1000);
          }
       }
       catch (Exception)
       { 
          Console.WriteLine("Exception on Server");
       }
    }
    

    对于客户端,我创建了一个包含 gRPC 客户端的类,并将服务器流式 MonitorTime 调用的结果公开为普通的 ole .net 事件。

       public class SimpleTimeEventClient
       {
          private SimpleTime.SimpleTimeService.SimpleTimeServiceClient mClient = null;
          private CancellationTokenSource mCancellationTokenSource = null;
          private Task mMonitorTask = null;
          public event EventHandler<string> OnTimeReceived;
    
          public SimpleTimeEventClient()
          {
             Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
             mClient = new SimpleTime.SimpleTimeService.SimpleTimeServiceClient(channel);
          }
    
          public void Startup()
          {
             mCancellationTokenSource = new CancellationTokenSource();
             mMonitorTask = Task.Run(() => MonitorTimeServer(mCancellationTokenSource.Token));
          }
    
          public void Shutdown()
          {
             mCancellationTokenSource.Cancel();
             mMonitorTask.Wait(10000);
          }
    
          private async Task MonitorTimeServer(CancellationToken token)
          {
             try
             {
                using (var call = mClient.MonitorTime(new SimpleTime.EmptyRequest()))
                {
                   while(await call.ResponseStream.MoveNext(token))
                   {
                      var timeResult = call.ResponseStream.Current;
                      OnTimeReceived?.Invoke(this, timeResult.Time);
                   }
                }
             }
             catch(Exception e)
             {
                Console.WriteLine($"Exception encountered in MonitorTimeServer:{e.Message}");
             }
          }
       }
    

    现在创建客户端并订阅事件。

      static void Main(string[] args)
      {
         SimpleTimeEventClient client = new SimpleTimeEventClient();
         client.OnTimeReceived += OnTimeReceivedEventHandler;
         client.Startup();
         Console.WriteLine("Press any key to exit");
         Console.ReadKey();
         client.Shutdown();
    
      }
    
      private static void OnTimeReceivedEventHandler(object sender, string e)
      {
         Console.WriteLine($"Time: {e}");
      }
    

    运行时产生的结果

    我省略了很多错误检查,以使示例更小。我所做的一件事是为 gRPC 接口提供许多服务器流调用,调用客户端可能感兴趣也可能不感兴趣,是实现事件访问器(添加、删除)以仅在有客户端时调用服务器端流方法订阅了包装的事件。希望对您有所帮助

    【讨论】:

    • 哇,感谢您提供的漂亮示例代码!它对我理解机制有很大帮助。在维护订阅者列表时,我只需要根据自己的需要进行调整。非常感谢!
    • 您知道是否可以从流式传输任务之外发送数据?将其视为数据更改的外部事件,我只想在更改事件发生时但通过此服务器端流传输这些新数据。这可能吗?
    • 我想您可以向 AsyncStream 重载添加方法或其他内容,但这需要客户端了解新的流类型
    猜你喜欢
    • 2021-11-22
    • 1970-01-01
    • 2013-12-30
    • 1970-01-01
    • 2016-02-20
    • 2023-04-10
    • 1970-01-01
    • 1970-01-01
    • 2013-10-28
    相关资源
    最近更新 更多