Tight Loop - Disk at 100%, Quad Core CPU @25% useage, only 15MBsec disk write speed

154 Views Asked by At

I have a tight loop which runs through a load of carts, which themselves contain around 10 events event objects and writes them to the disk in JSON via an intermediate repository (jOliver common domain rewired with GetEventStore.com):

// create ~200,000 carts, each with ~5 events
List<Cart> testData = TestData.GenerateFrom(products);
foreach (var cart in testData)
{
    count = count + (cart as IAggregate).GetUncommittedEvents().Count;
    repository.Save(cart);
}

I see the disk says it is as 100%, but the throughout is 'low' (15MB/sec, ~5,000 events per second) why is this, things i can think of are:

  1. Since this is single threaded does the 25% CPU usage actually mean 100% of the 1 core that I am on (any way to show specific core my app is running on in Visual Studio)?

  2. Am i constrained by I/O, or by CPU? Can I expect better performance if i create my own thread pool one for each CPU?

  3. How come I can copy a file at ~120MB/sec, but I can only get throughput of 15MB/sec in my app? Is this due to the write size of lots of smaller packets?

Anything else I have missed?

throughput

The code I am using is from the geteventstore docs/blog:

public class GetEventStoreRepository : IRepository
{
    private const string EventClrTypeHeader = "EventClrTypeName";
    private const string AggregateClrTypeHeader = "AggregateClrTypeName";
    private const string CommitIdHeader = "CommitId";
    private const int WritePageSize = 500;
    private const int ReadPageSize = 500;

    IStreamNamingConvention streamNamingConvention;

    private readonly IEventStoreConnection connection;
    private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };


    public GetEventStoreRepository(IEventStoreConnection eventStoreConnection, IStreamNamingConvention namingConvention)
    {
        this.connection = eventStoreConnection;
        this.streamNamingConvention = namingConvention;
    }

    public void Save(IAggregate aggregate)
    {
        this.Save(aggregate, Guid.NewGuid(), d => { });

    }

    public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
    {
        var commitHeaders = new Dictionary<string, object>
                {
                    {CommitIdHeader, commitId},
                    {AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
                };
        updateHeaders(commitHeaders);

        var streamName = this.streamNamingConvention.GetStreamName(aggregate.GetType(), aggregate.Identity);
        var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList();
        var originalVersion = aggregate.Version - newEvents.Count;
        var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1;
        var eventsToSave = newEvents.Select(e => ToEventData(Guid.NewGuid(), e, commitHeaders)).ToList();

        if (eventsToSave.Count < WritePageSize)
        {
            this.connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave).Wait();
        }
        else
        {
            var startTransactionTask = this.connection.StartTransactionAsync(streamName, expectedVersion);
            startTransactionTask.Wait();
            var transaction = startTransactionTask.Result;

            var position = 0;
            while (position < eventsToSave.Count)
            {
                var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
                var writeTask = transaction.WriteAsync(pageEvents);
                writeTask.Wait();
                position += WritePageSize;
            }

            var commitTask = transaction.CommitAsync();
            commitTask.Wait();
        }

        aggregate.ClearUncommittedEvents();
    }

    private static EventData ToEventData(Guid eventId, object evnt, IDictionary<string, object> headers)
    {
        var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, serializerSettings));

        var eventHeaders = new Dictionary<string, object>(headers)
                {
                    {
                        EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName
                    }
                };
        var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
        var typeName = evnt.GetType().Name;

        return new EventData(eventId, typeName, true, data, metadata);
    }
}
1

There are 1 best solutions below

0
On BEST ANSWER

It was partially mentioned in the comments, but to enhance on that, as you are working fully single-threaded in the mentioned code (though you use async, you are just waiting for them, so effectively working sync) you are suffering from latency and overhead for context switching and EventStore protocol back and forth. Either really go the async route, but avoid waiting on the async threads and rather parallelize it (EventStore likes parallelization because it can batch multiple writes) or do batching yourself and send, for example, 20 events at a time.