【问题标题】:Observable Network IO Parsing可观察网络 IO 解析
【发布时间】:2012-05-01 19:02:08
【问题描述】:

我正在尝试使用 Rx 从 TCPClient 接收流中读取数据并将数据解析为字符串的 IObservable,由换行符“\r\n”分隔以下是我从套接字流接收的方式...

var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

这是我用来解析字符串的方法。这目前不起作用...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

消息主题以块的形式接收消息,因此我尝试连接它们并测试连接的字符串是否包含换行符,从而指示缓冲区关闭并输出缓冲的块。不知道为什么它不起作用。似乎我只从 obsStrings 中得到了第一块。

所以我正在寻找两件事。我想简化 io 流的阅读并消除消息主题的使用。其次,我想让我的字符串解析工作。我已经对此进行了一段时间的破解,但无法提出可行的解决方案。我是 Rx 的初学者。

编辑:这是问题解决后的成品......

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
            .Where(x => x.EndsWith("\r\n"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("\n", ""));

“ReceiveUntilCompleted”是 RXX 项目的扩展。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:
    messages
        .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
        .Where(x => x.EndsWith("\r\n"))
    

    【讨论】:

    • 我发现我不需要 .Buffer(1);最后。
    【解决方案2】:

    你可以试试Select,而不是Subscribe 并使用Subject

    .Repeat().Select(x =&gt; System.Text.Encoding.UTF8.GetString(x));

    现在假设这一切都进入了一个名为 messages 的新 observable,你的下一个问题是在这一行中

    var obsStrings = messages.Buffer<string,string>(() =>  
                    messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
                );
    

    您同时使用BufferScan,并试图在两者中做同样的事情!请注意,Buffer 需要一个关闭选择器。

    你真正想要的是:

    var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n")))
                             .Select(buffered => String.Join(buffered));
    

    这为 Buffered 提供了一个关于何时关闭窗口(当它包含 \r\n 时)的可观察值,并为 Select 提供了要连接的缓冲量。这会产生一个新的可观察到的拆分字符串。

    一个问题是您仍然可以将新行放在块的中间,这会导致问题。一个简单的想法是观察字符而不是完整的字符串块,例如:

    obsStrings.Repeat().SelectMany(x =&gt; System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

    然后您可以通过messages.Where(c =&gt; c != '\r') 跳过\r 并将缓冲区更改为:

    var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n')))
                             .Select(buffered => String.Join("", buffered));
    

    【讨论】:

    • Buffer 电话中,您需要messages.Where(x =&gt; x == '\n') 而不是messages.Where(x =&gt; x != '\n')。也就是说,它会在每一行打破缓冲区。
    • 仍然从这个 oberservable 中得到不连贯的块。我想知道这是否是线程问题,在另一个线程上进行测试并且缓冲继续或提前中断,因此缓冲变得不确定。
    • Buffer 命令不会提前中断,它不是基于时间或类似的东西。如果没有更多信息或“破坏”的示例,很难知道如何提供帮助。
    猜你喜欢
    • 2010-12-01
    • 2020-02-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-27
    • 2019-08-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多