【发布时间】:2016-06-21 02:44:01
【问题描述】:
我对 Rx Buffer 运算符有一个奇怪的问题,我找不到合适的解决方案,也不知道我做错了什么。如果第 9 行的Buffer 没有使用EventLoopScheduler,它会在一段时间后没有从 (item) 推送的项目时开始泄漏内存?
第 1 行的 item 是一个 IObservable<Entity>,它将从 TCP 套接字检索到的已解析数据推送到下游。使Buffer 使用EventLoopScheduler 可以解决问题,但会降低整体系统性能。
如何在不强制Buffer 运算符使用EventLoopScheduler 的情况下解决此内存泄漏?
var groupedItems = items
.GroupBy(entity => entity._type)
.Select(o => new {Type = o.Key, Categories = o.GroupBy(entity => entity._key)});
var ev = new EventLoopScheduler();
var collections = from item in groupedItems
from category in item.Categories
from entities in category.Buffer(intervalTime, intervalSize, /* ev */)
where entities.Any()
select new LogCollection(item.Type, category.Key, entities);
collections.Buffer(TimeSpan.FromSeconds(1)).Where(o => o.Any()).Subscribe(Insert);
更新:
经过一番调查,Buffer 运算符似乎不是问题,只是它在EventLoopScheduler 上安排时“解决”了问题。绝望之下,我发布了关键代码片段,因为我对 Rx 还很陌生,我不知道我是否正确使用了范例 - 所以如果我滥用它,请纠正我! :)
背景知识:应用程序通过 TCP 套接字检索二进制数据,并在经过一些转换后将其插入数据库。
接收
客户端可以连接到服务器,客户端发送的数据将被转换。如果约定中出现任何异常,它将捕获异常并断开客户端。
public IObservable<LogEntity> StartListening(IDataConverter converter)
{
return Observable.Create<LogEntity>(observer =>
{
return _endPoint.ToListenerObservable(_backlog).Subscribe(client =>
{
var stream = client.ToClientObservable(_bufferSize, _waitHandle);
converter.Convert(stream)
.Catch<LogEntity, Exception>(exception =>
{
client.Close(); // dc client
return Observable.Empty<LogEntity>();
})
.Subscribe(observer.OnNext);
});
});
}
下面是负责读取发送到服务器的数据的代码。 WaitHandle 是EventWaitHandle 的包装器,如果数据库脱机,将阻塞以避免数据在系统中累积。 (当WaitHandle 被阻塞并且没有检索任何数据时会观察到问题)
public static IObservable<ArraySegment<byte>> ToClientObservable(this TcpClient client, int size, WaitHandle waitHandle)
{
return client.GetStream().ToStreamObservable(size, waitHandle);
}
public static IObservable<ArraySegment<byte>> ToStreamObservable(this Stream stream, int size, WaitHandle waitHandle)
{
return Observable.Create<ArraySegment<byte>>(async (observer, token) =>
{
var buffer = new byte[size];
try
{
while (!token.IsCancellationRequested)
{
waitHandle.BlockingWait();
var received = await stream.ReadAsync(buffer, 0, size, token);
if (received == 0) break;
observer.OnNext(new ArraySegment<byte>(buffer, 0, received));
}
observer.OnCompleted();
}
catch (Exception error)
{
observer.OnError(error);
}
});
}
转换器
转换器使用Scan 运算符来解析数据流。它内部可能会发生异常。目前,异常将传播到StartListing 方法,发送错误数据的客户端将在该方法中断开连接。
public IObservable<LogMessage> Convert(IObservable<ArraySegment<byte>> bytes)
{
return bytes.Scan(
new
{
Leftovers = new byte[0],
Logs = new List<LogMessage>(),
},
(saved, current) =>
{
// Parse bytes
// Exception here if invalid data retrieved
return new
{
Leftovers = data.ToArray(),
Logs = logs,
};
})
.SelectMany(o => o.Logs);
}
你们能看到任何可能导致内存泄漏的东西吗?这基本上是所有负责检索数据的代码,在将其发送到转换阶段之前对其进行转换(第一个问题)。此外,我使用 dotMemory 工具确认了内存泄漏。
【问题讨论】:
-
如果您能告诉我们您是如何确定存在内存泄漏的,并提供一个证明该问题的minimal reproducible example,将会很有用。
-
@Enigmativity:仅通过查看任务管理器即可观察到内存泄漏。在大约 5 秒内未检索任何数据后,内存开始增长。似乎它用完了任务池中所有可用的任务。我觉得奇怪的是
EventLoopScheduler解决了这个问题。如果有用的话,我真的很想分享更多代码,但我不知道从哪里开始,也许实时会话会有所帮助?我知道你是这方面的专家:) -
回答这个问题的问题是minimal reproducible example 真的很有帮助。你认为你可以创建一个控制台应用程序,并输入足够的代码来复制问题吗?我将首先放入您的大部分代码,然后,一旦您有泄漏,然后开始简化并提取代码,直到内存泄漏消失之前 - 然后发布该代码。我们需要能够复制、粘贴然后运行代码。
-
@Enigmativity:作为对李的回答的评论。我几乎绝望地将其余代码添加到我原来的问题中,因为我似乎无法弄清楚我做错了什么,或者我是否在滥用 Rx 范式。希望添加的代码能带来一些清晰。
-
在您更新的代码中,您已经调用了
ToListenerObservable,但我看不到源代码。
标签: c# memory-leaks system.reactive reactive-programming