As promised in my previous post, in this article I examine practical aspects related to DDD and, in particular to CQRS and Event Sourcing patterns.

The main goal of my experiment is to implement an aggregate according to the Event Sourcing paradigm, and to create a separate read model to feed the pages of a Web application.

Before presenting the example, I am going to briefly introduce the main architectural patterns that have been used since DDD launch.

Getting familiar with the use of CQRS and Event Sourcing was not easy. However, I was able to succeed thanks to the foundational skills built through Avanscoperta’s trainings, a massive dose of readings (Vaugh Vernon’s red book and several posts), and an adequate number of attempts always supported by tests.

The full source code I’m going to mention with examples in the post, is available on GitHub.

The journey continues. In the meantime, I treasure the learnings, and I enjoy the view of the horizon.

Architectures evolution

Evans’ blue book was published in 2003 and since then different architectural styles have emerged. During this evolution, DDD guidelines have always remained valid, except for some technicalities necessary for the implementation of the various architectures.

Layered architecture

Layered architecture is the original architecture used by Evans to isolate the domain logic from the other responsibilities of an application. The standard layers in which to split an application are:

  • UI, is the layer responsible for displaying information to the users and interpreting their commands;
  • Application, defines the possible scenarios of the application use cases. It coordinates the domain objects and takes care of managing all the activities necessary for the correct functioning of the application. For example, security, transactions, etc.;
  • Domain, contains everything related to the domain logic. The state of the domain objects is managed by this layer even if the persistence of the objects themselves is delegated to the infrastructure. This is the layer where the DDD guidelines are applied;
  • Infrastructure, does not contain neither domain logic nor application logic, but provides the technical implementations that serve the other layers to work. For example, persistence, transaction management, etc.

The fundamental rule of this architecture is that each layer can depend only on those below. The only way for the underlying layers to communicate with the upper layers is through the use of Observer and Mediator patterns.

The clear separation of the domain model in a layer distinct from the others allows you to develop and test business rules in total autonomy compared to the rest of the application.

The only cons of this approach is given by the dependence of the domain layer on the infrastructure layer, even if this could be well mitigated by the use of clear and well-defined interfaces.

Hexagonal architecture

The Hexagonal architecture by Alistair Cockburn, solves the problems of the previous architecture and introduces a more symmetrical application model. Basically in this architecture there is no longer a top and a bottom, but only the concepts of inside and the outside.

The inside consists of what used to be the application layer and the domain layer, simply defined as “application” by Cockburn. In other words, all that implements the use cases of business’s interest. The inside communicates with the outside through “ports” defined by the application itself. Examples of ports can be the public application API, the data access interface, and the domain event publishing interface.

The outside consists of everything that interacts with the application or, vice versa, with which the latter interacts. The interaction always occurs through adapters which either adapt the external signals to the application’s API, or implement the interfaces necessary for the correct operation of the latter. Examples of adapters could be the controllers that interpret HTTP requests and invoke application APIs or SQL implementation of data access interfaces.

Using DDD, Cockburn’s inside is commonly divided into two concentric levels:

  • the internal, consisting of the domain model that implements the business rules;
  • and the external one consists of the application services which define the usage scenarios.

This architecture allows to develop the application in total autonomy with respect to the external conditions (UI, database, etc.), allowing to execute it, and test it, in various configurations: with UI, from command line, etc.

The Hexagonal architecture is widely adopted and is the foundation of all the architectures that came after, including the patterns described below.

CQRS

In the traditional approaches, some described above, a single data model is used together with the related services, for writing and reading operations. This often leads to the creation of suboptimal models that often expose lots of information. Furthermore, this can lead to an excessive coupling between the model and the client code that uses it.

Command Query Responsibility Segregation (CQRS) is an architectural pattern which separates the responsibility for modifying data (Command) from reading them (Query). The formalization of this approach is generally attributed to Greg Young.

The use of two different models for writing and reading operations, in scope of CQRS, allows instead to design and optimize each model for its responsibilities. In addition to this, the use of distinct models also allows the selection of the most appropriate technologies. For example, we could decide to use different persistence mechanisms: for reading could be more appropriate to use a relational DB, while in writing we would prefer a NoSQL.

As soon as the reading and writing models are separated, the infrastructure could easily scale to best fit the needs. It often happens that the number of writings in a system is much lower than the readings. Therefore, using separate models and technologies will let scale differently the infrastructure related to the reading, in respect to the writing one.

Obviously the two models must be synchronized to ensure that the read information are consistent with the written ones. The consistency could not be immediate but must be eventually achieved. In DDD, synchronization occurs through the Domain Event generated by the aggregates.

The use of Domain Events to synchronize the reading model with the writing model has only one cons: generally the event emission and persistence are not transactional. This is due to the fact that the message management system and the persistence system are separate systems (e.g. RabbitMQ and SQL Server).

Event sourcing

Event sourcing (ES) overcomes the problem described above in an extremely elegant way. Let’s see how.

Normally to persist an aggregate at a given moment, its state is saved to a database. Using ES instead, what is saved is not the current state of the aggregate but the sequence of Domain Events that led the aggregate into its current state. To load an aggregate it is sufficient to read all the events associated with it and to replay them.

In this way, every time an aggregate is modified, a new record is appended to the flow of the events that represent it instead of updating an existing record. This approach is very efficient because it eliminates the risk of concurrent locking on table records and, consequently, the possibility of deadlocks between different writing threads.

This type of persistence is called event store. You can create your own event store as described in Vaughn Vernon’s book, using a relational database, or, as we’ll see later, you can use an existing product that provides this specific functionality.

Back to the problem of non-transactionality mentioned above, using an event store you have a single place where the Domain Events and, implicitly, the status of the aggregates are saved. It is therefore sufficient to read the unpublished events from the event store to avoid risks of inconsistency between the writing and reading models.

The application

Let’s have hands on. To deep dive on the architectural patterns briefly described above, and experiment them practically, I decided to develop a test application with the aim of implementing an aggregate according to the Event Sourcing paradigm, and to create a separate reading model that could be used to feed the pages of the Web application.

Moreover, I wanted to use Event Store in order to persist events. As the name suggests, Event Store is an event store developed, among the others, by Greg Young.

The application allows you to create carts and add products by specifying the quantity. The use cases are very simple:

  • it is be possible to create a cart by supplying the customer to whom it belongs;
  • it is be possible to add a product to the cart specifying the quantity, provided that the product is not already in the cart;
  • it is possible to change the quantity of a product already in a cart;
  • finally, for each product it should not be possible to add more than 50 units.

I developed the application using ASP.NET Core, C# and Docker.

Model an aggregate following Event Sourcing

To deal with the problem described above, I started from modeling the Cart aggregate. Following a classic approach, I started thinking about the classes’ properties that I would have to persist on the database using Entity Framework but, after an initial indecision, I changed the approach.

I started thinking about the domain events that the aggregate should have generated for the use cases described above, and I started with the creation of a new cart.

Extract from CartTest.cs
1
2
3
4
5
6
7
8
9
10
11
[Fact]
public void GivenNoCartExistsWhenCreateOneThenCartCreatedEvent()
{
    var cart = new Cart(DefaultCartId, DefaultCustomerId);

    AssertSingleUncommittedEvent<CartCreatedEvent>(cart, @event =>
    {
      Assert.Equal(DefaultCartId, @event.AggregateId);
      Assert.Equal(DefaultCustomerId, @event.CustomerId);
    });
}

As can be seen from the test, when I create a new cart instance, providing its own identifier and the identifier of its customer, I expect only one event of type CartCreatedEvent to be published, and that the latter contains the correct information. The AssertSingleUncommittedEvent method is a utility method of the test class to check the event generated by the aggregate and not yet committed.

Following the Event Sourcing paradigm, every action performed on an aggregate, even its creation, is divided into three conceptual steps:

  1. verification of the input parameters and the status of the aggregate in order to check feasibility of the action;
  2. if the previous checks are positive, publishing of the events triggered by the action;
  3. updating of the aggregate’s state according to the events mentioned above.

Keeping steps 2 and 3 separated, as explained later, is necessary when we need to recreate an object starting from the events present on the event store. With these guidelines, to meet the above test, I created the following class.

Extract from Cart.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Cart : AggregateBase<CartId>
{
    private CustomerId CustomerId { get; set; }

    public Cart(CartId cartId, CustomerId customerId) : this()
    {
        if (cartId == null) throw new ArgumentNullException(nameof(cartId));
        if (customerId == null) throw new ArgumentNullException(nameof(customerId));
        RaiseEvent(new CartCreatedEvent(cartId, customerId));
    }

    internal void Apply(CartCreatedEvent ev)
    {
        Id = ev.AggregateId;
        CustomerId = ev.CustomerId;
    }
}

The code reflects the three steps described above: verification (lines 7 and 8), events publishing (line 9), and status update (Apply method).

In order to make the reading of the Cart class easier and to avoid duplications, I created the AggregateBase base class that basically acts as a Layer Supertype for the domain aggregates. The base class is described in more detail later.

Likewise to what was done for the creation of a new cart, to implement the use case of adding a product to the cart, I started from the definition of the test.

Extract from CartTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Fact]
public void GivenACartWhenAddAProductThenProductAddedEvent()
{
    var cart = new Cart(DefaultCartId, DefaultCustomerId);
    ClearUncommittedEvents(cart);

    cart.AddProduct(DefaultProductId, 2);

    AssertSingleUncommittedEvent<ProductAddedEvent>(cart, @event =>
    {
        Assert.Equal(DefaultProductId, @event.ProductId);
        Assert.Equal(2, @event.Quantity);
        Assert.Equal(DefaultCartId, @event.AggregateId);
        Assert.Equal(0, @event.AggregateVersion);
    });
}
The only difference from the previous test is the use of the ClearUncommittedEvents utility method to clear the list of the uncommitted events. Apart from this detail, also in this case the test is easily readable: when I add a product to the cart I expect the corresponding event to be published and to contain the data describing the operation just happened.

It is important to note that I did not make any statement about the status (properties or fields) of the Cart class. In fact, I do not care how the class internally manages its status, but simply want to make sure that the correct events are issued. In fact, these allow me to build a reading model for my web application.

This is the perfect application of the “encapsulation” principle. I will never be able to thank Ziobrando enough for having found the perfect metaphor for such a simple and often underestimated concept that could be appreciated only by those who speak Italian: Black Knight. This is a good reason to take some Italian classes!

The status of the aggregate is fundamental for its correct functioning. To make sure that the aggregate manage the status correctly, it was sufficient to add the following test.

Extract from CartTest.cs
1
2
3
4
5
6
7
8
9
10
11
[Fact]
public void GivenACartWithAProductWhenAddingTheSameProductThenThrowsCartException()
{
    var cart = new Cart(DefaultCartId, DefaultCustomerId);

    cart.AddProduct(DefaultProductId, 2);
    ClearUncommittedEvents(cart);

    Assert.Throws<CartException>(() => { cart.AddProduct(DefaultProductId, 1); });
    Assert.Empty(GetUncommittedEventsOf(cart));
}
If Cart did not correctly manage its status, it would not be able to check whether it already contains the product or not.

Given the tests above, the implementation of the expected behavior was the following.

Extract from Cart.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Cart : AggregateBase<CartId>
{
  private List<CartItem> Items { get; set; }

  public void AddProduct(ProductId productId, int quantity)
  {
    if (productId == null)
    {
      throw new ArgumentNullException(nameof(productId));
    }
    if (ContainsProduct(productId))
    {
      throw new CartException($"Product {productId} already added");
    }
    RaiseEvent(new ProductAddedEvent(productId, quantity));
  }

  internal void Apply(ProductAddedEvent ev)
  {
    Items.Add(new CartItem(ev.ProductId, ev.Quantity));
  }

  private bool ContainsProduct(ProductId productId)
  {
    return Items.Any(x => x.ProductId == productId);
  }
}
Extract from CartItem.cs
1
2
3
4
5
6
7
8
9
10
11
12
public class CartItem
{
  public CartItem(ProductId productId, int quantity)
  {
    ProductId = productId;
    Quantity = quantity;
  }

  public ProductId ProductId { get; }

  public int Quantity { get; }
}
Note that the cart maintains a list of added items, but the list is not accessible outside the class. The list is only useful for the implementation of expected behaviors and must not be used in any way by the client code. I would like to highlight that for the implementation of last use case, it would not have been necessary to keep the quantities in the list. It would have been enough to maintain a list of product identifiers. The quantity is there just because it is useful to implement the other use cases described above.

Since the article is already quite full of concepts, I will avoid describing the implementation of the other use cases. I leave it to those interested in consulting the code on GitHub.

AggregateBase

AggregateBase implements two basic interfaces. The first is IAggregate which states that each aggregate must have an identifier.

IAggregate.cs
1
2
3
4
public interface IAggregate<TId>
{
    TId Id { get; }
}

The second interface defines the signatures of the methods needed to work with the Event Sourcing paradigm. Each aggregate must have a version (Version) in order to manage potential writing conflicts. It must also be possible to apply domain events (ApplyEvent), this is useful for load an aggregate from the event store. Finally, obtaining the uncommitted events (GetUncommittedEvents), and clear it (ClearUncommittedEvents) must be possible.

IEventSourcingAggregate.cs
1
2
3
4
5
6
7
internal interface IEventSourcingAggregate<TAggregateId>
{
    long Version { get; }
    void ApplyEvent(IDomainEvent<TAggregateId> @event, long version);
    IEnumerable<IDomainEvent<TAggregateId>> GetUncommittedEvents();
    void ClearUncommittedEvents();
}
The interface is declared internal because it is visible only inside the assembly which contains the domain objects. This is an implementation detail that does not matter to client code.

The AggregateBase class implements the above interfaces pretty smoothly. There are two methods that deserve further examination: ApplyEvent and RaiseEvent.

ApplyEvent - in order avoid duplication, the ApplyEvent method verifies that the event to be applied is not among those uncommitted. In case the event is actually to be applied, line 18 allows to invoke the specific method to apply the domain event.

In the case of the Cart class and CartCreatedEvent event, using the dynamic keyword allows the AggregateBase class to dynamically invoke the internal void Apply(CartCreatedEvent ev) method of the Cart class at runtime.

This benchmark shows that the dynamic invocation of methods, although slower than switch with pattern matching, is still much faster than other options.

RaiseEvent - this method guarantees to assign the aggregate’s correct version and the identifier to the publishing event, simplifying the code of the Cart class. Moreover, before appending the event to the uncommitted ones, it applies the event itself to the aggregate so that the state of the latter is consistent with the published events.

AggregateBase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class AggregateBase<TId> : 
  IAggregate<TId>, IEventSourcingAggregate<TId>
{
  public const long NewAggregateVersion = -1;

  private readonly ICollection<IDomainEvent<TId>> _uncommittedEvents = 
    new LinkedList<IDomainEvent<TId>>();
  private long _version = NewAggregateVersion;

  public TId Id { get; protected set;  }

  long IEventSourcingAggregate<TId>.Version => _version;

  void IEventSourcingAggregate<TId>.ApplyEvent(IDomainEvent<TId> @event, long version)
  {
    if (!_uncommittedEvents.Any(x => Equals(x.EventId, @event.EventId)))
    {
      ((dynamic)this).Apply((dynamic)@event);
      _version = version;
    }
  }

  void IEventSourcingAggregate<TId>.ClearUncommittedEvents()
    => _uncommittedEvents.Clear();

  IEnumerable<IDomainEvent<TId>> IEventSourcingAggregate<TId>.GetUncommittedEvents()
    => _uncommittedEvents.AsEnumerable();

  protected void RaiseEvent<TEvent>(TEvent @event)
      where TEvent: DomainEventBase<TId>
  {
    IDomainEvent<TId> eventWithAggregate = @event.WithAggregate(
      Equals(Id, default(TId)) ? @event.AggregateId : Id,
      _version);

    ((IEventSourcingAggregate<TId>)this).ApplyEvent(eventWithAggregate, _version + 1);
    _uncommittedEvents.Add(@event);
  }
}

Persistence

Following a classic DDD approach, to access the aggregates and to persist them, I would define an IRepository interface in the domain model with these classic operations: GetByID, Remove, and Save, plus any specialized search methods. After that I would implement this interface in some supporting assembly. This implementation would be dependent on some persistence mechanism and this dependence would certainly be made explicit by the name of the implementation itself, e.g. EntityFrameworkRepository.

Using Event Sourcing the interface definition, and its implementation, take a different shape. First of all, the IRepository interface is simpler.

IRepository.cs
1
2
3
4
5
6
7
public interface IRepository<TAggregate, TAggregateId>
  where TAggregate: IAggregate<TAggregateId>
{
  Task<TAggregate> GetByIdAsync(TAggregateId id);

  Task SaveAsync(TAggregate aggregate);
}
Searching on an event store by attribute is complex and definitely not well performing. In addition, event stores are append-only persistence mechanisms, therefore previously written events can not be deleted. For these reasons the only suitable operations are the recovery of an aggregate through its identifier, and the saving of an aggregate, whether new or pre-existing.

Concerning the implementation, the repository must be aware at least about the methods of the IEventSourcingAggregate interface in order to access the domain uncommitted events of an aggregate, and to apply the events retrieved from the event store to an aggregate. For this reason it is more natural to put part of the repository implementation in the same assembly that contains the domain model.

EventSourcingRepository.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class EventSourcingRepository<TAggregate, TAggregateId> :
    IRepository<TAggregate, TAggregateId>
  where TAggregate : AggregateBase<TAggregateId>, IAggregate<TAggregateId>
  where TAggregateId : IAggregateId
{
  private readonly IEventStore eventStore;
  private readonly ITransientDomainEventPublisher publisher;

  public EventSourcingRepository(IEventStore eventStore,
    ITransientDomainEventPublisher publisher)
  {
    this.eventStore = eventStore;
    this.publisher = publisher;
  }

  public async Task<TAggregate> GetByIdAsync(TAggregateId id)
  {
    try
    {
      var aggregate = CreateEmptyAggregate();
      IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;

      foreach (var @event in await eventStore.ReadEventsAsync(id))
      {
        aggregatePersistence.ApplyEvent(@event.DomainEvent, @event.EventNumber);
      }
      return aggregate;
    }
    catch (EventStoreAggregateNotFoundException)
    {
      return null;
    }
    catch (EventStoreCommunicationException ex)
    {
      throw new RepositoryException("Unable to access persistence layer", ex);
    }
  }

  public async Task SaveAsync(TAggregate aggregate)
  {
    try
    {
      IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;

      foreach (var @event in aggregatePersistence.GetUncommittedEvents())
      {
        await eventStore.AppendEventAsync(@event);
        await publisher.PublishAsync((dynamic)@event);
      }
      aggregatePersistence.ClearUncommittedEvents();
    }
    catch (EventStoreCommunicationException ex)
    {
      throw new RepositoryException("Unable to access persistence layer", ex);
    }
  }

  private TAggregate CreateEmptyAggregate()
  {
    return (TAggregate)typeof(TAggregate)
      .GetConstructor(
        BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public, 
        null, new Type[0], new ParameterModifier[0])
      .Invoke(new object[0]);
  }
}

The repository implementation piece that remains outside the domain model is the one related to the specific used event store. For this purpose I introduced the IEventStore interface. The implementation of this interface is explained later.

IEventStore.cs
1
2
3
4
5
6
7
8
public interface IEventStore
{
  Task<IEnumerable<Event<TAggregateId>>> ReadEventsAsync<TAggregateId>(TAggregateId id)
    where TAggregateId: IAggregateId;

  Task<AppendResult> AppendEventAsync<TAggregateId>(IDomainEvent<TAggregateId> @event)
    where TAggregateId: IAggregateId;
}

Back to the EventSourcingRepository class, you can see how the GetByIdAsync method applies all events retrieved from the event store to an empty instance of the aggregate using the ApplyEvent method. The empty aggregate instance is created using the CreateEmptyAggregate method. This method makes use of C# reflection to create an aggregate using its default constructor. I decided to use reflection because I want the aggregates to be able to declare the default constructor as private, as in the case of the Cart class. This is because exposing the default constructor or not must be an aggregate design choice, not an architectural constraint.

Regarding SaveAsync method, it deals with recovering the uncommitted events from the aggregate, save them on the event store and, if all work well, remove them from the aggregate.

I also decided to use this method to publish events internally to the application through the ITransientDomainEventPublisher interface. The class that implements this interface is nothing more than a lightweight publisher as described in chapter 8 (Domain Events) of Vaugh Vernon’s red book. Unlike the book, in my case the events are published by the repository instead of the aggregates. For two reasons: the aggregate code is simpler, and the SaveAsync method is the only one in which we are sure that the events have been saved and there are no conflicts.

Event Store

For this experiment I used Event Store to persist the Event Sourcing model. In order to setup the environment I included eventstore/eventstore Docker image in the docker-compose.yml file of my project. I used the basic settings since I did not have specific needs.

In order to communicate with the Event Store server, I installed the NuGet package EventStore.ClientAPI.NetCore, version 4.0.3-rc. Since I haven’t ever used this product I created some tests to check the behavior of the library and the server. After that, I implemented the IEventStore interface described above.

Event Store provides several interesting features like projection. In order to use them, Event Store requires that the event are persisted in JSON format. Since other event store could rely on other encodings, I decided to implement serialization and deserialization of events at this level.

For the sake of conciseness, I did not place the implementation here. It could be found on GitHub.

CQRS and read model

As already mentioned above, using Event Sourcing for the domain model, we miss a read model to show information to the user. CQRS comes to the rescue.

Following the guidelines of this pattern, I created a simple read model which satisfy the Web application needs.

EventSourcingCQRS.ReadModel\Cart\Cart.cs
1
2
3
4
5
6
7
public class Cart : IReadEntity
{
  public string Id { get; set; }
  public int TotalItems { get; set; }
  public string CustomerId { get; set; }
  public string CustomerName { get; set; }
}
EventSourcingCQRS.ReadModel\Cart\CartItem.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CartItem : IReadEntity
{
    public string Id { get; private set; }
    public string CartId { get; set; }
    public string ProductId { get; set; }
    public string ProductName { get; set; }
    public int Quantity { get; set; }

    public static CartItem CreateFor(string cartId, string productId)
    {
      return new CartItem
      {
        Id = IdFor(cartId, productId),
        CartId = cartId,
        ProductId = productId
      };
    }

    public static string IdFor(string cartId, string productId)
    {
      return $"{productId}@{cartId}";
    }
}

This model have to be synchronized with the write model. For this reason I created the CartUpdater class which, handling the domain events published by the write model, is in charge of updating the read model.

CartUpdater.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class CartUpdater : IDomainEventHandler<CartId, CartCreatedEvent>,
  IDomainEventHandler<CartId, ProductAddedEvent>,
  IDomainEventHandler<CartId, ProductQuantityChangedEvent>
{
  private readonly IReadOnlyRepository<Customer.Customer> customerRepository;
  private readonly IReadOnlyRepository<Product.Product> productRepository;
  private readonly IRepository<Cart.Cart> cartRepository;
  private readonly IRepository<Cart.CartItem> cartItemRepository;

  public CartUpdater(IReadOnlyRepository<Customer.Customer> customerRepository,
    IReadOnlyRepository<Product.Product> productRepository,
    IRepository<Cart.Cart> cartRepository,
    IRepository<Cart.CartItem> cartItemRepository)
  {
    this.customerRepository = customerRepository;
    this.productRepository = productRepository;
    this.cartRepository = cartRepository;
    this.cartItemRepository = cartItemRepository;
  }

  public async Task HandleAsync(CartCreatedEvent @event)
  {
    var customer = await customerRepository.GetByIdAsync(
      @event.CustomerId.IdAsString());

    await cartRepository.InsertAsync(new Cart.Cart
      {
        Id = @event.AggregateId.IdAsString(),
        CustomerId = customer.Id,
        CustomerName = customer.Name,
        TotalItems = 0
      });
  }

  public async Task HandleAsync(ProductAddedEvent @event)
  {
    var product = await productRepository.GetByIdAsync(@event.ProductId.IdAsString());
    var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());
    var cartItem = Cart.CartItem.CreateFor(
      @event.AggregateId.IdAsString(),
      @event.ProductId.IdAsString());

    cartItem.ProductName = product.Name;
    cartItem.Quantity = @event.Quantity;
    cart.TotalItems += @event.Quantity;
    await cartRepository.UpdateAsync(cart);
    await cartItemRepository.InsertAsync(cartItem);
  }

  public async Task HandleAsync(ProductQuantityChangedEvent @event)
  {
    var cartItemId = Cart.CartItem.IdFor(
      @event.AggregateId.IdAsString(),
      @event.ProductId.IdAsString());
    var cartItem = (await cartItemRepository
      .FindAllAsync(x => x.Id == cartItemId))
      .Single();
    var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());

    cart.TotalItems += @event.NewQuantity - @event.OldQuantity;
    cartItem.Quantity = @event.NewQuantity;

    await cartRepository.UpdateAsync(cart);
    await cartItemRepository.UpdateAsync(cartItem);
  }
}
IRepository and IReadOnlyRepositoy interfaces differ from the IRepository interface described in Repository paragraph. These interfaces belong to the EventSourcingCQRS.ReadModel project and they are used to access the read model. The only interaction between CartUpdater class and the write model is through the domain events published by the latter.

Incidentally, the implementation of IRepository and IReadOnlyRepositoy uses MongoDB and it does not have access to the Event Store.

An issue that often arise when using CQRS is the delay between the actions on the write model and the update of the read model. Since this is just an example, I decide to solve this problem in a fast way: I registered CartUpdater class as consumer of the lightweight publisher described above. In this way I can guarantee that the read model is updated even before the SaveAsync method (see Repository paragraph) is completed.

This is not the right approach to solve the delay issue since it decrease the scalability of the system since it bounds the operations on the domain model to the update of the read model. Nonetheless, if you are working on an existing application and you are introducing these patterns, this could be a feasible way to do it gradually.

Back to the code, who is in charge of coordinating the action on the write model and the update of the read model is an application service called CartWriter. See it below.

CartWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class CartWriter : ICartWriter
{
  private readonly IRepository<Cart, CartId> cartRepository;
  private readonly ITransientDomainEventSubscriber subscriber;
  private readonly IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers;
  private readonly IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers;
  private readonly IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers;

  public CartWriter(IRepository<Cart, CartId> cartRepository, ITransientDomainEventSubscriber subscriber,
      IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers,
      IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers,
      IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers)
  {
    this.cartRepository = cartRepository;
    this.subscriber = subscriber;
    this.cartCreatedEventHandlers = cartCreatedEventHandlers;
    this.productAddedEventHandlers = productAddedEventHandlers;
    this.productQuantityChangedEventHandlers = productQuantityChangedEventHandlers;
  }

  public async Task AddProductAsync(string cartId, string productId, int quantity)
  {
    var cart = await cartRepository.GetByIdAsync(new CartId(cartId));

    subscriber.Subscribe<ProductAddedEvent>(
      async @event => await HandleAsync(productAddedEventHandlers, @event));
    cart.AddProduct(new ProductId(productId), quantity);
    await cartRepository.SaveAsync(cart);
  }

  public async Task ChangeProductQuantityAsync(
      string cartId, string productId, int quantity)
  {
    var cart = await cartRepository.GetByIdAsync(new CartId(cartId));

    subscriber.Subscribe<ProductQuantityChangedEvent>(
      async @event => await HandleAsync(productQuantityChangedEventHandlers, @event));
    cart.ChangeProductQuantity(new ProductId(productId), quantity);
    await cartRepository.SaveAsync(cart);
  }

  public async Task CreateAsync(string customerId)
  {
    var cart = new Cart(CartId.NewCartId(), new CustomerId(customerId));

    subscriber.Subscribe<CartCreatedEvent>(
      async @event => await HandleAsync(cartCreatedEventHandlers, @event));
    await cartRepository.SaveAsync(cart);
  }

  public async Task HandleAsync<T>(
      IEnumerable<IDomainEventHandler<CartId, T>> handlers, T @event)
    where T : IDomainEvent<CartId>
  {
    foreach (var handler in handlers)
    {
      await handler.HandleAsync(@event);
    }
  }
}
As you can see from the code, the CartWriter service did not know anything about CartUpdater class. The latter is just injected ad implementation of IDomainEventHandler interface. CartWriter just subscribes all the injected handlers.

The ITransientDomainEventSubscriber interface is the counterpart of ITransientDomainEventPublisher interface seen above. Both the interfaces are implemented by the TransientDomainEventPubSub class, which is charge of dispatching all the published events to the corresponding subscribers. I called this class transient since it ensures that the subscriptions stay valid only a given execution context. In this way the handlers registered during and operation are not invoked in the subsequent ones.

The CartWriter service seen above is simply the facade of our domain model. For the sake of symmetry, There is also a facade for the read model, called CartReader.

CartReader.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CartReader : ICartReader
{
  private readonly IReadOnlyRepository<Cart> cartRepository;
  private readonly IReadOnlyRepository<CartItem> cartItemRepository;

  public CartReader(IReadOnlyRepository<Cart> cartRepository,
      IReadOnlyRepository<CartItem> cartItemRepository)
  {
    this.cartRepository = cartRepository;
    this.cartItemRepository = cartItemRepository;
  }

  public async Task<IEnumerable<Cart>> FindAllAsync(
      Expression<Func<Cart, bool>> predicate)
  {
    return await cartRepository.FindAllAsync(predicate);
  }

  public async Task<Cart> GetByIdAsync(string id)
  {
    return await cartRepository.GetByIdAsync(id);
  }

  public async Task<IEnumerable<CartItem>> GetItemsOfAsync(string cartId)
  {
    return await cartItemRepository.FindAllAsync(x => x.CartId == cartId);
  }
}
This class uses the repositories of the read model in order to returns data to the client code.

Summing up, in order to implement the CQRS pattern, and show data to the users, I created a read model (Cart and CartItem) which is synchronized with the domain model through CartUpdater and I provided to application services (CartWriter and CartReader) which cleanly separate writing operations from reading operations.

The power of this approach is that you can create as many read model as you need, based on the features to be developed. Moreover, if an existing read model comes out to be wrong or inadequate, e.g. due to changed requirements, you can delete and create a new one. The latter can be populated by replaying all the domain events available into the event store.

Conclusions

The post is definitely the longest I’ve written so far, however the subject deserved all the effort and time I dedicated, and much more. The code I produced during the experiment is more than what I could present, and it is available on GitHub.

Event Sourcing is a different paradigm from what normally a developer is used to, and requires to change approach. Once done, it is simple to use. There are aspects that deserve further investigation but those were not in scope: snapshots management, publication of events to other applications, and correction of errors just to name the most important.

CQRS becomes essential if you want to use Event Sourcing, but it can also be used alone. To use its full potential, it is necessary to correctly manage possible delays in updating the read model. For this reason, its adoption must be evaluated case-by-case, however it is very flexible and the potentials I have experienced are certainly interesting.