【问题标题】:Rx Buffer Memory LeakRx 缓冲区内存泄漏
【发布时间】:2016-06-21 02:44:01
【问题描述】:

我对 Rx Buffer 运算符有一个奇怪的问题,我找不到合适的解决方案,也不知道我做错了什么。如果第 9 行的Buffer 没有使用EventLoopScheduler,它会在一段时间后没有从 (item) 推送的项目时开始泄漏内存?

第 1 行的 item 是一个 IObservable<Entity>,它将从 TCP 套接字检索到的已解析数据推送到下游。使Buffer 使用EventLoopScheduler 可以解决问题,但会降低整体系统性能。

如何在不强制Buffer 运算符使用EventLoopScheduler 的情况下解决此内存泄漏?

var groupedItems = items
    .GroupBy(entity => entity._type)
    .Select(o => new {Type = o.Key, Categories = o.GroupBy(entity => entity._key)});

var ev = new EventLoopScheduler();

var collections = from item in groupedItems
    from category in item.Categories
    from entities in category.Buffer(intervalTime, intervalSize, /* ev */)
    where entities.Any()
    select new LogCollection(item.Type, category.Key, entities);

collections.Buffer(TimeSpan.FromSeconds(1)).Where(o => o.Any()).Subscribe(Insert);

更新:

经过一番调查,Buffer 运算符似乎不是问题,只是它在EventLoopScheduler 上安排时“解决”了问题。绝望之下,我发布了关键代码片段,因为我对 Rx 还很陌生,我不知道我是否正确使用了范例 - 所以如果我滥用它,请纠正我! :)

背景知识:应用程序通过 TCP 套接字检索二进制数据,并在经过一些转换后将其插入数据库。

接收

客户端可以连接到服务器,客户端发送的数据将被转换。如果约定中出现任何异常,它将捕获异常并断开客户端。

public IObservable<LogEntity> StartListening(IDataConverter converter) 
{
    return Observable.Create<LogEntity>(observer => 
    {
        return _endPoint.ToListenerObservable(_backlog).Subscribe(client => 
        {
            var stream = client.ToClientObservable(_bufferSize, _waitHandle);

            converter.Convert(stream)
            .Catch<LogEntity, Exception>(exception =>
            {
                client.Close(); // dc client
                return Observable.Empty<LogEntity>();
            })
            .Subscribe(observer.OnNext);
        });
    });
}

下面是负责读取发送到服务器的数据的代码。 WaitHandleEventWaitHandle 的包装器,如果数据库脱机,将阻塞以避免数据在系统中累积。 (WaitHandle 被阻塞并且没有检索任何数据时会观察到问题

public static IObservable<ArraySegment<byte>> ToClientObservable(this TcpClient client, int size, WaitHandle waitHandle)
{
    return client.GetStream().ToStreamObservable(size, waitHandle);
}

public static IObservable<ArraySegment<byte>> ToStreamObservable(this Stream stream, int size, WaitHandle waitHandle)
{
    return Observable.Create<ArraySegment<byte>>(async (observer, token) =>
    {
        var buffer = new byte[size];

        try
        {
            while (!token.IsCancellationRequested)
            {
                waitHandle.BlockingWait();
                var received = await stream.ReadAsync(buffer, 0, size, token);
                if (received == 0) break;
                observer.OnNext(new ArraySegment<byte>(buffer, 0, received));
            }
            observer.OnCompleted();
        }
        catch (Exception error)
        {
            observer.OnError(error);
        }
    });
}

转换器

转换器使用Scan 运算符来解析数据流。它内部可能会发生异常。目前,异常将传播到StartListing 方法,发送错误数据的客户端将在该方法中断开连接。

public IObservable<LogMessage> Convert(IObservable<ArraySegment<byte>> bytes)
{
    return bytes.Scan(
        new
        {
            Leftovers = new byte[0],
            Logs = new List<LogMessage>(),
        },
        (saved, current) =>
        {
            // Parse bytes
            // Exception here if invalid data retrieved

            return new
            {
                Leftovers = data.ToArray(),
                Logs = logs,
            };
        })
        .SelectMany(o => o.Logs);
}

你们能看到任何可能导致内存泄漏的东西吗?这基本上是所有负责检索数据的代码,在将其发送到转换阶段之前对其进行转换(第一个问题)。此外,我使用 dotMemory 工具确认了内存泄漏。

【问题讨论】:

  • 如果您能告诉我们您是如何确定存在内存泄漏的,并提供一个证明该问题的minimal reproducible example,将会很有用。
  • @Enigmativity:仅通过查看任务管理器即可观察到内存泄漏。在大约 5 秒内未检索任何数据后,内存开始增长。似乎它用完了任务池中所有可用的任务。我觉得奇怪的是EventLoopScheduler 解决了这个问题。如果有用的话,我真的很想分享更多代码,但我不知道从哪里开始,也许实时会话会有所帮助?我知道你是这方面的专家:)
  • 回答这个问题的问题是minimal reproducible example 真的很有帮助。你认为你可以创建一个控制台应用程序,并输入足够的代码来复制问题吗?我将首先放入您的大部分代码,然后,一旦您有泄漏,然后开始简化并提取代码,直到内存泄漏消失之前 - 然后发布该代码。我们需要能够复制、粘贴然后运行代码。
  • @Enigmativity:作为对李的回答的评论。我几乎绝望地将其余代码添加到我原来的问题中,因为我似乎无法弄清楚我做错了什么,或者我是否在滥用 Rx 范式。希望添加的代码能带来一些清晰。
  • 在您更新的代码中,您已经调用了ToListenerObservable,但我看不到源代码。

标签: c# memory-leaks system.reactive reactive-programming


【解决方案1】:

您的示例代码有一些值得注意的地方。 首先,它不是@Enigmativity 指出的 MVCE,例如items 是什么类型,它的值,它们的属性/(字段?),与 LogCollection 相同。

其次,您似乎运行了过多的GroupBy 操作。这会创建可观察序列的 3 深度嵌套。我认为您只想GroupBy 一次,然后依靠匿名打字为您做正确的事情,即.GroupBy(entity =&gt; new { entity.Type, entity.Key})。我这样说是因为一旦你分组了两次,你似乎只是再次打开它。

第三,你缓冲两次。两次都检查空缓冲区。一次使用调度程序(也许)而另一个不使用?第二个缓冲区似乎是多余的。

第四,您似乎没有关闭任何GroupBy“窗口”。这意味着对于这些嵌套分组中的每一个,您都在创建独立的缓冲区。根据您的平台,每个都可以在线程/任务池上运行。因此,您可以在程序中释放无限制且未知的并发级别。因此,随着这些新组中的每一个都是使用 _type_key 的新组合创建的,您正在创建新的缓冲区接收器,它们将永远不会停止/处置/清理,并将继续消耗资源。

第五,我们不知道您的内存问题是否只是因为没有足够的内存压力来强制进行 GC,因此您看到内存压力攀升。

我认为您的查询可以简化为:

from item in items
group item by new { item.Type, item.Key} into grp
from buffer in grp.Buffer(intervalTime, intervalSize, scheduler)
where buffer.Any()
select new LogCollection(grp.Key.Type, grp.Key.Key, buffer);    

为了解决内存压力问题,我强烈建议您提供一些使组过期的方法。即使很简单,只是在一段时间后终止您的订阅,然后立即重新订阅(RetryPublish 可以在这里提供帮助)。否则,如果你得到一个只出现一次的类型/密钥对,你将支付一个组的价格,从而为整个订阅的生命周期支付缓冲。

最后,在查看内存压力问题时,我建议实际捕获或分析您的应用程序,而不是查看可能会向您发送大量虚假信息的任务管理器。尝试GC.GetTotalMemory(true) 或一些 WMI 挂钩,甚至只是跟踪 GC.CollectionCount 值。

【讨论】:

  • 首先,我要感谢你这么好的回答!您还用分组运算符教会了我一两件事。可悲的是,它并没有解决内存泄漏。绝望之下,我添加了一些用于检索和解析数据的代码片段。我对 Rx 相当陌生,问题可能是我滥用了范式。如果我做错了什么可能导致内存泄漏,请纠正我。我真的很想弄清楚这一点!提前感谢您的帮助!
猜你喜欢
  • 1970-01-01
  • 2015-06-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多