【问题标题】:Tight Loop - Disk at 100%, Quad Core CPU @25% useage, only 15MBsec disk write speed紧密循环 - 磁盘 100%,四核 CPU @25% 使用率,只有 15 MB 秒的磁盘写入速度
【发布时间】:2014-11-20 17:19:40
【问题描述】:

我有一个紧密的循环,它贯穿大量购物车,这些购物车本身包含大约 10 个事件事件对象,并通过中间存储库(使用 GetEventStore.com 重新连接的 jOliver 公共域)以 JSON 格式将它们写入磁盘:

// create ~200,000 carts, each with ~5 events
List<Cart> testData = TestData.GenerateFrom(products);
foreach (var cart in testData)
{
    count = count + (cart as IAggregate).GetUncommittedEvents().Count;
    repository.Save(cart);
}

我看到磁盘说它是 100%,但吞吐量是“低”的(15MB/秒,每秒约 5,000 个事件)为什么会这样,我能想到的是:

  1. 由于这是单线程的,25% 的 CPU 使用率实际上是否意味着我使用的 1 个核心的 100%(以任何方式显示我的应用在 Visual Studio 中运行的特定核心)?

  2. 我是受 I/O 限制还是受 CPU 限制?如果我为每个 CPU 创建一个自己的线程池,我能否获得更好的性能?

  3. 为什么我可以以 ~120MB/秒的速度复制文件,但我的应用程序只能获得 15MB/秒的吞吐量?这是由于大量较小数据包的写入大小造成的吗?

还有什么我错过的吗?

我使用的代码来自 geteventstore 文档/博客:

public class GetEventStoreRepository : IRepository
{
    private const string EventClrTypeHeader = "EventClrTypeName";
    private const string AggregateClrTypeHeader = "AggregateClrTypeName";
    private const string CommitIdHeader = "CommitId";
    private const int WritePageSize = 500;
    private const int ReadPageSize = 500;

    IStreamNamingConvention streamNamingConvention;

    private readonly IEventStoreConnection connection;
    private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };


    public GetEventStoreRepository(IEventStoreConnection eventStoreConnection, IStreamNamingConvention namingConvention)
    {
        this.connection = eventStoreConnection;
        this.streamNamingConvention = namingConvention;
    }

    public void Save(IAggregate aggregate)
    {
        this.Save(aggregate, Guid.NewGuid(), d => { });

    }

    public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
    {
        var commitHeaders = new Dictionary<string, object>
                {
                    {CommitIdHeader, commitId},
                    {AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
                };
        updateHeaders(commitHeaders);

        var streamName = this.streamNamingConvention.GetStreamName(aggregate.GetType(), aggregate.Identity);
        var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList();
        var originalVersion = aggregate.Version - newEvents.Count;
        var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1;
        var eventsToSave = newEvents.Select(e => ToEventData(Guid.NewGuid(), e, commitHeaders)).ToList();

        if (eventsToSave.Count < WritePageSize)
        {
            this.connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave).Wait();
        }
        else
        {
            var startTransactionTask = this.connection.StartTransactionAsync(streamName, expectedVersion);
            startTransactionTask.Wait();
            var transaction = startTransactionTask.Result;

            var position = 0;
            while (position < eventsToSave.Count)
            {
                var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
                var writeTask = transaction.WriteAsync(pageEvents);
                writeTask.Wait();
                position += WritePageSize;
            }

            var commitTask = transaction.CommitAsync();
            commitTask.Wait();
        }

        aggregate.ClearUncommittedEvents();
    }

    private static EventData ToEventData(Guid eventId, object evnt, IDictionary<string, object> headers)
    {
        var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, serializerSettings));

        var eventHeaders = new Dictionary<string, object>(headers)
                {
                    {
                        EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName
                    }
                };
        var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
        var typeName = evnt.GetType().Name;

        return new EventData(eventId, typeName, true, data, metadata);
    }
}

【问题讨论】:

  • 如果没有将数据实际保存到磁盘的代码,我们无话可说。仔细检查您是否正确缓冲并使用足够大的缓冲区。
  • 好吧,我不知道 API,但您确实有一个 CommitAsync 并在代码中等待,如果您的事件不是几个 kb 大,这听起来是个非常糟糕的主意。
  • 每秒提交数千次微小写入,这对于 SSD 来说是一场噩梦。这也是一个磨损均衡的噩梦。每写出大约一百个字节,磁盘必须擦除一个 512kB 的页面,从旧扇区复制 511kB,并附加您正在写入的几个字节。
  • 当您的程序烧毁 100% 核心时,它不会被磁盘卡住。这也是磁盘写入率低的原因。 JSON 并不是一种非常便宜的格式,有很多字符串处理。使用适当的分析器。
  • 删除 WaitAsync 是一种方式,方式更快。需要阅读 c# 异步编程。关于 SSD 的有效点,但不要对发生的事情做噩梦 - 仅供参考,事件存储块文件在 256MB 的固定文件块中,因此它不会破坏 SSD!

标签: multithreading performance eventstoredb


【解决方案1】:

它在 cmets 中被部分提及,但为了加强这一点,因为您在上述代码中完全单线程工作(尽管您使用异步,但您只是在等待它们,因此有效地工作同步)您正在受苦来自上下文切换和 EventStore 协议来回的延迟和开销。要么真的走异步路线,但避免等待异步线程,而是将其并行化(EventStore 喜欢并行化,因为它可以批处理多个写入),或者自己进行批处理并一次发送例如 20 个事件。

【讨论】:

    猜你喜欢
    • 2015-03-23
    • 1970-01-01
    • 2018-05-25
    • 2023-01-14
    • 2014-01-05
    • 1970-01-01
    • 2017-01-16
    • 2014-12-20
    • 1970-01-01
    相关资源
    最近更新 更多