【发布时间】:2014-11-20 17:19:40
【问题描述】:
我有一个紧密的循环,它贯穿大量购物车,这些购物车本身包含大约 10 个事件事件对象,并通过中间存储库(使用 GetEventStore.com 重新连接的 jOliver 公共域)以 JSON 格式将它们写入磁盘:
// create ~200,000 carts, each with ~5 events
List<Cart> testData = TestData.GenerateFrom(products);
foreach (var cart in testData)
{
count = count + (cart as IAggregate).GetUncommittedEvents().Count;
repository.Save(cart);
}
我看到磁盘说它是 100%,但吞吐量是“低”的(15MB/秒,每秒约 5,000 个事件)为什么会这样,我能想到的是:
-
由于这是单线程的,25% 的 CPU 使用率实际上是否意味着我使用的 1 个核心的 100%(以任何方式显示我的应用在 Visual Studio 中运行的特定核心)?
我是受 I/O 限制还是受 CPU 限制?如果我为每个 CPU 创建一个自己的线程池,我能否获得更好的性能?
为什么我可以以 ~120MB/秒的速度复制文件,但我的应用程序只能获得 15MB/秒的吞吐量?这是由于大量较小数据包的写入大小造成的吗?
还有什么我错过的吗?
我使用的代码来自 geteventstore 文档/博客:
public class GetEventStoreRepository : IRepository
{
private const string EventClrTypeHeader = "EventClrTypeName";
private const string AggregateClrTypeHeader = "AggregateClrTypeName";
private const string CommitIdHeader = "CommitId";
private const int WritePageSize = 500;
private const int ReadPageSize = 500;
IStreamNamingConvention streamNamingConvention;
private readonly IEventStoreConnection connection;
private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };
public GetEventStoreRepository(IEventStoreConnection eventStoreConnection, IStreamNamingConvention namingConvention)
{
this.connection = eventStoreConnection;
this.streamNamingConvention = namingConvention;
}
public void Save(IAggregate aggregate)
{
this.Save(aggregate, Guid.NewGuid(), d => { });
}
public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var commitHeaders = new Dictionary<string, object>
{
{CommitIdHeader, commitId},
{AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
};
updateHeaders(commitHeaders);
var streamName = this.streamNamingConvention.GetStreamName(aggregate.GetType(), aggregate.Identity);
var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList();
var originalVersion = aggregate.Version - newEvents.Count;
var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1;
var eventsToSave = newEvents.Select(e => ToEventData(Guid.NewGuid(), e, commitHeaders)).ToList();
if (eventsToSave.Count < WritePageSize)
{
this.connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave).Wait();
}
else
{
var startTransactionTask = this.connection.StartTransactionAsync(streamName, expectedVersion);
startTransactionTask.Wait();
var transaction = startTransactionTask.Result;
var position = 0;
while (position < eventsToSave.Count)
{
var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
var writeTask = transaction.WriteAsync(pageEvents);
writeTask.Wait();
position += WritePageSize;
}
var commitTask = transaction.CommitAsync();
commitTask.Wait();
}
aggregate.ClearUncommittedEvents();
}
private static EventData ToEventData(Guid eventId, object evnt, IDictionary<string, object> headers)
{
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, serializerSettings));
var eventHeaders = new Dictionary<string, object>(headers)
{
{
EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName
}
};
var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
var typeName = evnt.GetType().Name;
return new EventData(eventId, typeName, true, data, metadata);
}
}
【问题讨论】:
-
如果没有将数据实际保存到磁盘的代码,我们无话可说。仔细检查您是否正确缓冲并使用足够大的缓冲区。
-
好吧,我不知道 API,但您确实有一个
CommitAsync并在代码中等待,如果您的事件不是几个 kb 大,这听起来是个非常糟糕的主意。 -
每秒提交数千次微小写入,这对于 SSD 来说是一场噩梦。这也是一个磨损均衡的噩梦。每写出大约一百个字节,磁盘必须擦除一个 512kB 的页面,从旧扇区复制 511kB,并附加您正在写入的几个字节。
-
当您的程序烧毁 100% 核心时,它不会被磁盘卡住。这也是磁盘写入率低的原因。 JSON 并不是一种非常便宜的格式,有很多字符串处理。使用适当的分析器。
-
删除 WaitAsync 是一种方式,方式更快。需要阅读 c# 异步编程。关于 SSD 的有效点,但不要对发生的事情做噩梦 - 仅供参考,事件存储块文件在 256MB 的固定文件块中,因此它不会破坏 SSD!
标签: multithreading performance eventstoredb