【问题标题】:Using System.Reactive deserialize messages使用 System.Reactive 反序列化消息
【发布时间】:2012-02-25 13:42:15
【问题描述】:

我目前有一个程序可以侦听网络流并在反序列化新消息时触发事件。

while(true)
{
  byte[] lengthBytes = new byte[10];
  networkStream.Read(lengthBytes, 0, 10);
  int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
  var messageBytes = new byte[messageLength + 10];
  Array.Copy(lengthBytes, messageBytes, 10);
  int bytesReadTotal = 10;
  while (bytesReadTotal < 10 + messageLength)
    bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
  OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}

我想使用响应式扩展来重写它,以便使用IObservable&lt;Message&gt; 代替事件。这可以使用

Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
  (h) => NewMessage += h,
  (h) => NewMessage -= h)
    .Select(  (e) => { return e.Message; });

但是,我更愿意改用 System.Reactive 重写监听过程。我的出发点(来自here)是

Func<byte[], int, int, IObservable<int>> read;   
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);

允许

byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
  (bytesRead) => ;
});

我正在努力寻找如何继续。有人有实现吗?

【问题讨论】:

    标签: system.reactive


    【解决方案1】:

    我想出了以下方法,但我觉得不创建类和使用 Subject&lt;T&gt; 应该是可能的(例如,通过将标头数据包投影到消息对象的正文数据包,但问题是EndRead() 不返回字节数组,而是返回读取的字节数。所以你需要一个对象或至少在某个时候有一个闭包)。

    class Message
    {
        public string Text { get; set; }
    }
    
    class MessageStream : IObservable<Message>
    {
        private readonly Subject<Message> messages = new Subject<Message>();
    
        public void Start()
        {
            // Get your real network stream here.
            var stream  = Console.OpenStandardInput();
            GetNextMessage( stream );
        }
    
        private void GetNextMessage(Stream stream)
        {
            var header = new byte[10];
            var read = Observable.FromAsyncPattern<byte [], int, int, int>( stream.BeginRead, stream.EndRead );
            read( header, 0, 10 ).Subscribe( b =>
            {
                var bodyLength = BitConverter.ToInt32( header, 0 );
                var body = new byte[bodyLength];
                read( body, 0, bodyLength ).Subscribe( b2 =>
                {
                    var message = new Message() {Text = Encoding.UTF8.GetString( body )};
                    messages.OnNext( message );
                    GetNextMessage( stream );
                } );
            } );
        }
    
        public IDisposable Subscribe( IObserver<Message> observer )
        {
            return messages.Subscribe( observer );
        }
    }
    

    【讨论】:

    • 运行一段时间后不会导致 StackOverflowException 吗?
    【解决方案2】:

    由于Observable.FromAsyncPattern 只进行一次异步调用,您需要创建一个函数来多次调用它。这应该可以帮助您入门,但可能还有很大的改进空间。它假定您可以使用相同的参数重复进行异步调用,并假定 selector 将处理由此产生的任何问题。

    Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
                 begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult),
                 [end] As Func(Of IAsyncResult, TCallResult),
                 selector As Func(Of TCallResult, TResult),
                 isComplete As Func(Of TCallResult, Boolean)
                ) As Func(Of T1, T2, T3, IObservable(Of TResult))
        Return Function(a1, a2, a3) Observable.Create(Of TResult)(
            Function(obs)
                Dim serial As New SerialDisposable()
                Dim fac = Observable.FromAsyncPattern(begin, [end])
                Dim onNext As Action(Of TCallResult) = Nothing
                'this function will restart the subscription and will be
                'called every time a value is found
                Dim subscribe As Func(Of IDisposable) =
                    Function()
                        'note that we are REUSING the arguments, the
                        'selector should handle this appropriately
                        Return fac(a1, a2, a3).Subscribe(onNext,
                                                         Sub(ex)
                                                             obs.OnError(ex)
                                                             serial.Dispose()
                                                         End Sub)
                    End Function
                'set up the OnNext handler to restart the observer 
                'every time it completes
                onNext = Sub(v)
                             obs.OnNext(selector(v))
                             'subscriber disposed, do not check for completion
                             'or resubscribe
                             If serial.IsDisposed Then Exit Sub
                             If isComplete(v) Then
                                 obs.OnCompleted()
                                 serial.Dispose()
                             Else
                                 'using the scheduler lets the OnNext complete before
                                 'making the next async call.
                                 'you could parameterize the scheduler, but it may not be
                                 'helpful, and it won't work if Immediate is passed.
                                 Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe())
                             End If
                         End Sub
                'start the first subscription
                serial.Disposable = subscribe()
                Return serial
            End Function)
    End Function
    

    从这里,您可以像这样获得IObservable(Of Byte)

    Dim buffer(4096 - 1) As Byte
    Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
                     AddressOf stream.BeginRead, AddressOf stream.EndRead,
                     Function(numRead)
                         If numRead < 0 Then Throw New ArgumentException("Invalid number read")
                         Console.WriteLine("Position after read: " & stream.Position.ToString())
                         Dim ret(numRead - 1) As Byte
                         Array.Copy(buffer, ret, numRead)
                         Return ret
                     End Function,
                     Function(numRead) numRead <= 0)
    'this will be an observable of the chunk size you specify
    Dim obs = obsFac(buffer, 0, buffer.Length)
    

    从那里,您将需要某种累加器函数,该函数接收字节数组并在找到完整消息时输出它们。这种函数的骨架可能如下所示:

    Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
        Return Observable.Create(Of message)(
            Function(obs)
                Dim accumulator As New List(Of Byte)
                Return source.Subscribe(
                    Sub(buffer)
                        'do some logic to build a packet here
                        accumulator.AddRange(buffer)
                        If True Then
                            obs.OnNext(New message())
                            'reset accumulator
                        End If
                    End Sub,
                    AddressOf obs.OnError,
                    AddressOf obs.OnCompleted)
            End Function)
    End Function
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-15
      • 2023-03-16
      • 1970-01-01
      • 2012-05-16
      • 2019-07-12
      相关资源
      最近更新 更多