As promised in my previous post, in this article I examine practical aspects related to DDD and, in particular to CQRS and Event Sourcing patterns.
The main goal of my experiment is to implement an aggregate according to the Event Sourcing paradigm, and to create a separate read model to feed the pages of a Web application.
Before presenting the example, I am going to briefly introduce the main architectural patterns that have been used since DDD launch.
Getting familiar with the use of CQRS and Event Sourcing was not easy. However, I was able to succeed thanks to the foundational skills built through Avanscoperta’s training, a massive dose of readings (Vaugh Vernon’s red book and several posts), and an adequate number of attempts always supported by tests.
The full source code I’m going to mention with examples in the post, is available on GitHub.
The journey continues. In the meantime, I treasure the learnings, and I enjoy the view of the horizon.
Architectures evolution
Evans’ blue book was published in 2003 and since then different architectural styles have emerged. During this evolution, DDD guidelines have always remained valid, except for some technicalities necessary for the implementation of the various architectures.
Layered architecture
Layered architecture is the original architecture used by Evans to isolate the domain logic from the other responsibilities of an application. The standard layers in which to split an application are:
- UI, is the layer responsible for displaying information to the users and interpreting their commands;
- Application, defines the possible scenarios of the application use cases. It coordinates the domain objects and takes care of managing all the activities necessary for the correct functioning of the application. For example, security, transactions, etc.;
- Domain, contains everything related to the domain logic. The state of the domain objects is managed by this layer even if the persistence of the objects themselves is delegated to the infrastructure. This is the layer where the DDD guidelines are applied;
- Infrastructure, does not contain neither domain logic nor application logic, but provides the technical implementations that serve the other layers to work. For example, persistence, transaction management, etc.
The fundamental rule of this architecture is that each layer can depend only on those below. The only way for the underlying layers to communicate with the upper layers is through the use of Observer and Mediator patterns.
The clear separation of the domain model in a layer distinct from the others allows you to develop and test business rules in total autonomy compared to the rest of the application.
The only cons of this approach is given by the dependence of the domain layer on the infrastructure layer, even if this could be well mitigated by the use of clear and well-defined interfaces.
Hexagonal architecture
The Hexagonal architecture by Alistair Cockburn, solves the problems of the previous architecture and introduces a more symmetrical application model. Basically in this architecture there is no longer a top and a bottom, but only the concepts of inside and the outside.
The inside consists of what used to be the application layer and the domain layer, simply defined as “application” by Cockburn. In other words, all that implements the use cases of business’s interest. The inside communicates with the outside through “ports” defined by the application itself. Examples of ports can be the public application API, the data access interface, and the domain event publishing interface.
The outside consists of everything that interacts with the application or, vice versa, with which the latter interacts. The interaction always occurs through adapters which either adapt the external signals to the application’s API, or implement the interfaces necessary for the correct operation of the latter. Examples of adapters could be the controllers that interpret HTTP requests and invoke application APIs or SQL implementation of data access interfaces.
Using DDD, Cockburn’s inside is commonly divided into two concentric levels:
- the internal, consisting of the domain model that implements the business rules;
- and the external one consists of the application services which define the usage scenarios.
This architecture allows to develop the application in total autonomy with respect to the external conditions (UI, database, etc.), allowing to execute it, and test it, in various configurations: with UI, from command line, etc.
The Hexagonal architecture is widely adopted and is the foundation of all the architectures that came after, including the patterns described below.
CQRS
In the traditional approaches, some described above, a single data model is used together with the related services, for writing and reading operations. This often leads to the creation of suboptimal models that often expose lots of information. Furthermore, this can lead to an excessive coupling between the model and the client code that uses it.
Command Query Responsibility Segregation (CQRS) is an architectural pattern which separates the responsibility for modifying data (Command) from reading them (Query). The formalization of this approach is generally attributed to Greg Young.
The use of two different models for writing and reading operations, in scope of CQRS, allows instead to design and optimize each model for its responsibilities. In addition to this, the use of distinct models also allows the selection of the most appropriate technologies. For example, we could decide to use different persistence mechanisms: for reading could be more appropriate to use a relational DB, while in writing we would prefer a NoSQL.
As soon as the reading and writing models are separated, the infrastructure could easily scale to best fit the needs. It often happens that the number of writings in a system is much lower than the readings. Therefore, using separate models and technologies will let scale differently the infrastructure related to the reading, in respect to the writing one.
Obviously the two models must be synchronized to ensure that the read information are consistent with the written ones. The consistency could not be immediate but must be eventually achieved. In DDD, synchronization occurs through the Domain Event generated by the aggregates.
The use of Domain Events to synchronize the reading model with the writing model has only one cons: generally the event emission and persistence are not transactional. This is due to the fact that the message management system and the persistence system are separate systems (e.g. RabbitMQ and SQL Server).
Event sourcing
Event sourcing (ES) overcomes the problem described above in an extremely elegant way. Let’s see how.
Normally to persist an aggregate at a given moment, its state is saved to a database. Using ES instead, what is saved is not the current state of the aggregate but the sequence of Domain Events that led the aggregate into its current state. To load an aggregate it is sufficient to read all the events associated with it and to replay them.
In this way, every time an aggregate is modified, a new record is appended to the flow of the events that represent it instead of updating an existing record. This approach is very efficient because it eliminates the risk of concurrent locking on table records and, consequently, the possibility of deadlocks between different writing threads.
This type of persistence is called event store. You can create your own event store as described in Vaughn Vernon’s book, using a relational database, or, as we’ll see later, you can use an existing product that provides this specific functionality.
Back to the problem of non-transactionality mentioned above, using an event store you have a single place where the Domain Events and, implicitly, the status of the aggregates are saved. It is therefore sufficient to read the unpublished events from the event store to avoid risks of inconsistency between the writing and reading models.
The application
Let’s have hands on. To deep dive on the architectural patterns briefly described above, and experiment them practically, I decided to develop a test application with the aim of implementing an aggregate according to the Event Sourcing paradigm, and to create a separate reading model that could be used to feed the pages of the Web application.
Moreover, I wanted to use Event Store in order to persist events. As the name suggests, Event Store is an event store developed, among the others, by Greg Young.
The application allows you to create carts and add products by specifying the quantity. The use cases are very simple:
- it is be possible to create a cart by supplying the customer to whom it belongs;
- it is be possible to add a product to the cart specifying the quantity, provided that the product is not already in the cart;
- it is possible to change the quantity of a product already in a cart;
- finally, for each product it should not be possible to add more than 50 units.
I developed the application using ASP.NET Core, C# and Docker.
Model an aggregate following Event Sourcing
To deal with the problem described above, I started from modeling the Cart
aggregate.
Following a classic approach, I started thinking about the classes’ properties
that I would have to persist on the database using Entity Framework but, after an
initial indecision, I changed the approach.
I started thinking about the domain events that the aggregate should have generated for the use cases described above, and I started with the creation of a new cart.
1 |
[Fact] public void GivenNoCartExistsWhenCreateOneThenCartCreatedEvent() { var cart = new Cart(DefaultCartId, DefaultCustomerId); AssertSingleUncommittedEvent<CartCreatedEvent>(cart, @event => { Assert.Equal(DefaultCartId, @event.AggregateId); Assert.Equal(DefaultCustomerId, @event.CustomerId); }); } |
As can be seen from the test, when I create a new cart instance, providing its own
identifier and the identifier of its customer, I expect only one event of type CartCreatedEvent
to be published, and that the latter contains the correct information. The AssertSingleUncommittedEvent
method is a utility method of the test class to check the event generated by the
aggregate and not yet committed.
Following the Event Sourcing paradigm, every action performed on an aggregate, even its creation, is divided into three conceptual steps:
- verification of the input parameters and the status of the aggregate in order to check feasibility of the action;
- if the previous checks are positive, publishing of the events triggered by the action;
- updating of the aggregate’s state according to the events mentioned above.
Keeping steps 2 and 3 separated, as explained later, is necessary when we need to recreate an object starting from the events present on the event store. With these guidelines, to meet the above test, I created the following class.
1 |
public class Cart : AggregateBase<CartId> { private CustomerId CustomerId { get; set; } public Cart(CartId cartId, CustomerId customerId) : this() { if (cartId == null) throw new ArgumentNullException(nameof(cartId)); if (customerId == null) throw new ArgumentNullException(nameof(customerId)); RaiseEvent(new CartCreatedEvent(cartId, customerId)); } internal void Apply(CartCreatedEvent ev) { Id = ev.AggregateId; CustomerId = ev.CustomerId; } } |
The code reflects the three steps described above: verification (lines 7 and 8),
events publishing (line 9), and status update (Apply
method).
In order to make the reading of the Cart
class easier and to avoid duplications,
I created the AggregateBase
base class that basically acts as a Layer Supertype
for the domain aggregates. The base class is described in more detail later.
Likewise to what was done for the creation of a new cart, to implement the use case
of adding a product to the cart, I started from the definition of the test.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Fact]
public void GivenACartWhenAddAProductThenProductAddedEvent()
{
var cart = new Cart(DefaultCartId, DefaultCustomerId);
ClearUncommittedEvents(cart);
cart.AddProduct(DefaultProductId, 2);
AssertSingleUncommittedEvent<ProductAddedEvent>(cart, @event =>
{
Assert.Equal(DefaultProductId, @event.ProductId);
Assert.Equal(2, @event.Quantity);
Assert.Equal(DefaultCartId, @event.AggregateId);
Assert.Equal(0, @event.AggregateVersion);
});
}
ClearUncommittedEvents
utility method to clear the list of the uncommitted events. Apart from this detail,
also in this case the test is easily readable: when I add a product to the cart
I expect the corresponding event to be published and to contain the data describing
the operation just happened.
It is important to note that I did not make any statement about the status (properties
or fields) of the Cart
class. In fact, I do not care how the class internally manages
its status, but simply want to make sure that the correct events are issued. In fact,
these allow me to build a reading model for my web application.
This is the perfect application of the “encapsulation” principle. I will never be able to thank Ziobrando enough for having found the perfect metaphor for such a simple and often underestimated concept that could be appreciated only by those who speak Italian: Black Knight. This is a good reason to take some Italian classes!
The status of the aggregate is fundamental for its correct functioning. To make sure
that the aggregate manage the status correctly, it was sufficient to add the following
test.
1
2
3
4
5
6
7
8
9
10
11
[Fact]
public void GivenACartWithAProductWhenAddingTheSameProductThenThrowsCartException()
{
var cart = new Cart(DefaultCartId, DefaultCustomerId);
cart.AddProduct(DefaultProductId, 2);
ClearUncommittedEvents(cart);
Assert.Throws<CartException>(() => { cart.AddProduct(DefaultProductId, 1); });
Assert.Empty(GetUncommittedEventsOf(cart));
}
Cart
did not correctly manage its status, it would not be able to check whether
it already contains the product or not.
Given the tests above, the implementation of the expected behavior was the following.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Cart : AggregateBase<CartId>
{
private List<CartItem> Items { get; set; }
public void AddProduct(ProductId productId, int quantity)
{
if (productId == null)
{
throw new ArgumentNullException(nameof(productId));
}
if (ContainsProduct(productId))
{
throw new CartException($"Product {productId} already added");
}
RaiseEvent(new ProductAddedEvent(productId, quantity));
}
internal void Apply(ProductAddedEvent ev)
{
Items.Add(new CartItem(ev.ProductId, ev.Quantity));
}
private bool ContainsProduct(ProductId productId)
{
return Items.Any(x => x.ProductId == productId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class CartItem
{
public CartItem(ProductId productId, int quantity)
{
ProductId = productId;
Quantity = quantity;
}
public ProductId ProductId { get; }
public int Quantity { get; }
}
Since the article is already quite full of concepts, I will avoid describing the implementation of the other use cases. I leave it to those interested in consulting the code on GitHub.
AggregateBase
AggregateBase
implements two basic interfaces. The first is IAggregate
which
states that each aggregate must have an identifier.
1
2
3
4
public interface IAggregate<TId>
{
TId Id { get; }
}
The second interface defines the signatures of the methods needed to work with the
Event Sourcing paradigm. Each aggregate must have a version (Version
) in order
to manage potential writing conflicts. It must also be possible to apply domain events
(ApplyEvent
), this is useful for load an aggregate from the event store. Finally,
obtaining the uncommitted events (GetUncommittedEvents
), and clear it (ClearUncommittedEvents
)
must be possible.
1
2
3
4
5
6
7
internal interface IEventSourcingAggregate<TAggregateId>
{
long Version { get; }
void ApplyEvent(IDomainEvent<TAggregateId> @event, long version);
IEnumerable<IDomainEvent<TAggregateId>> GetUncommittedEvents();
void ClearUncommittedEvents();
}
internal
because it is visible only inside the assembly
which contains the domain objects. This is an implementation detail that does not
matter to client code.
The AggregateBase
class implements the above interfaces pretty smoothly. There
are two methods that deserve further examination: ApplyEvent
and RaiseEvent
.
ApplyEvent - in order avoid duplication, the ApplyEvent
method verifies that
the event to be applied is not among those uncommitted. In case the event is actually
to be applied, line 18 allows to invoke the specific method to apply the domain event.
In the case of the Car
t class and CartCreatedEvent
event, using the dynamic
keyword allows the AggregateBase
class to dynamically invoke the
internal void Apply(CartCreatedEvent ev)
method of the Cart
class at runtime.
This benchmark
shows that the dynamic invocation of methods, although slower than switch
with
pattern matching, is still much faster than other options.
RaiseEvent - this method guarantees to assign the aggregate’s correct version
and the identifier to the publishing event, simplifying the code of the Cart
class.
Moreover, before appending the event to the uncommitted ones, it applies the event
itself to the aggregate so that the state of the latter is consistent with the published
events.
1 |
public abstract class AggregateBase<TId> : IAggregate<TId>, IEventSourcingAggregate<TId> { public const long NewAggregateVersion = -1; private readonly ICollection<IDomainEvent<TId>> _uncommittedEvents = new LinkedList<IDomainEvent<TId>>(); private long _version = NewAggregateVersion; public TId Id { get; protected set; } long IEventSourcingAggregate<TId>.Version => _version; void IEventSourcingAggregate<TId>.ApplyEvent(IDomainEvent<TId> @event, long version) { if (!_uncommittedEvents.Any(x => Equals(x.EventId, @event.EventId))) { ((dynamic)this).Apply((dynamic)@event); _version = version; } } void IEventSourcingAggregate<TId>.ClearUncommittedEvents() => _uncommittedEvents.Clear(); IEnumerable<IDomainEvent<TId>> IEventSourcingAggregate<TId>.GetUncommittedEvents() => _uncommittedEvents.AsEnumerable(); protected void RaiseEvent<TEvent>(TEvent @event) where TEvent: DomainEventBase<TId> { IDomainEvent<TId> eventWithAggregate = @event.WithAggregate( Equals(Id, default(TId)) ? @event.AggregateId : Id, _version); ((IEventSourcingAggregate<TId>)this).ApplyEvent(eventWithAggregate, _version + 1); _uncommittedEvents.Add(@event); } } |
Persistence
Following a classic DDD approach, to access the aggregates and to persist them, I
would define an IRepository
interface in the domain model with these classic operations:
GetByID
, Remove
, and Save
, plus any specialized search methods. After that
I would implement this interface in some supporting assembly. This implementation
would be dependent on some persistence mechanism and this dependence would certainly
be made explicit by the name of the implementation itself, e.g. EntityFrameworkRepository
.
Using Event Sourcing the interface definition, and its implementation, take a different
shape. First of all, the IRepository
interface is simpler.
1
2
3
4
5
6
7
public interface IRepository<TAggregate, TAggregateId>
where TAggregate: IAggregate<TAggregateId>
{
Task<TAggregate> GetByIdAsync(TAggregateId id);
Task SaveAsync(TAggregate aggregate);
}
Concerning the implementation, the repository must be aware at least about the methods
of the IEventSourcingAggregate
interface in order to access the domain uncommitted
events of an aggregate, and to apply the events retrieved from the event store to
an aggregate. For this reason it is more natural to put part of the repository implementation
in the same assembly that contains the domain model.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class EventSourcingRepository<TAggregate, TAggregateId> :
IRepository<TAggregate, TAggregateId>
where TAggregate : AggregateBase<TAggregateId>, IAggregate<TAggregateId>
where TAggregateId : IAggregateId
{
private readonly IEventStore eventStore;
private readonly ITransientDomainEventPublisher publisher;
public EventSourcingRepository(IEventStore eventStore,
ITransientDomainEventPublisher publisher)
{
this.eventStore = eventStore;
this.publisher = publisher;
}
public async Task<TAggregate> GetByIdAsync(TAggregateId id)
{
try
{
var aggregate = CreateEmptyAggregate();
IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;
foreach (var @event in await eventStore.ReadEventsAsync(id))
{
aggregatePersistence.ApplyEvent(@event.DomainEvent, @event.EventNumber);
}
return aggregate;
}
catch (EventStoreAggregateNotFoundException)
{
return null;
}
catch (EventStoreCommunicationException ex)
{
throw new RepositoryException("Unable to access persistence layer", ex);
}
}
public async Task SaveAsync(TAggregate aggregate)
{
try
{
IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;
foreach (var @event in aggregatePersistence.GetUncommittedEvents())
{
await eventStore.AppendEventAsync(@event);
await publisher.PublishAsync((dynamic)@event);
}
aggregatePersistence.ClearUncommittedEvents();
}
catch (EventStoreCommunicationException ex)
{
throw new RepositoryException("Unable to access persistence layer", ex);
}
}
private TAggregate CreateEmptyAggregate()
{
return (TAggregate)typeof(TAggregate)
.GetConstructor(
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public,
null, new Type[0], new ParameterModifier[0])
.Invoke(new object[0]);
}
}
The repository implementation piece that remains outside the domain model is the
one related to the specific used event store. For this purpose I introduced the IEventStore
interface. The implementation of this interface is explained later.
1
2
3
4
5
6
7
8
public interface IEventStore
{
Task<IEnumerable<Event<TAggregateId>>> ReadEventsAsync<TAggregateId>(TAggregateId id)
where TAggregateId: IAggregateId;
Task<AppendResult> AppendEventAsync<TAggregateId>(IDomainEvent<TAggregateId> @event)
where TAggregateId: IAggregateId;
}
Back to the EventSourcingRepository
class, you can see how the GetByIdAsync
method
applies all events retrieved from the event store to an empty instance of the aggregate
using the ApplyEvent
method. The empty aggregate instance is created using the
CreateEmptyAggregate
method. This method makes use of C# reflection to create an
aggregate using its default constructor. I decided to use reflection because I want
the aggregates to be able to declare the default constructor as private, as in the
case of the Cart
class. This is because exposing the default constructor or not
must be an aggregate design choice, not an architectural constraint.
Regarding SaveAsync
method, it deals with recovering the uncommitted events from
the aggregate, save them on the event store and, if all work well, remove them from
the aggregate.
I also decided to use this method to publish events internally to the application
through the ITransientDomainEventPublisher
interface. The class that implements
this interface is nothing more than a lightweight publisher as described in chapter
8 (Domain Events) of Vaugh Vernon’s red book.
Unlike the book, in my case the events are published by the repository instead of
the aggregates. For two reasons: the aggregate code is simpler, and the SaveAsync
method is the only one in which we are sure that the events have been saved and there
are no conflicts.
Event Store
For this experiment I used Event Store to persist the
Event Sourcing model. In order to setup the environment I included eventstore/eventstore
Docker image in the docker-compose.yml
file of my project. I used the basic settings
since I did not have specific needs.
In order to communicate with the Event Store server, I installed the NuGet package
EventStore.ClientAPI.NetCore
, version 4.0.3-rc. Since I haven’t ever used this
product I created some tests to check the behavior of the library and the server.
After that, I implemented the IEventStore
interface described above.
Event Store provides several interesting features like projection. In order to use them, Event Store requires that the event are persisted in JSON format. Since other event store could rely on other encodings, I decided to implement serialization and deserialization of events at this level.
For the sake of conciseness, I did not place the implementation here. It could be found on GitHub.
CQRS and read model
As already mentioned above, using Event Sourcing for the domain model, we miss a read model to show information to the user. CQRS comes to the rescue.
Following the guidelines of this pattern, I created a simple read model which satisfy
the Web application needs.
1
2
3
4
5
6
7
public class Cart : IReadEntity
{
public string Id { get; set; }
public int TotalItems { get; set; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CartItem : IReadEntity
{
public string Id { get; private set; }
public string CartId { get; set; }
public string ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public static CartItem CreateFor(string cartId, string productId)
{
return new CartItem
{
Id = IdFor(cartId, productId),
CartId = cartId,
ProductId = productId
};
}
public static string IdFor(string cartId, string productId)
{
return $"{productId}@{cartId}";
}
}
This model have to be synchronized with the write model. For this reason I created
the CartUpdater
class which, handling the domain events published by the write
model, is in charge of updating the read model.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class CartUpdater : IDomainEventHandler<CartId, CartCreatedEvent>,
IDomainEventHandler<CartId, ProductAddedEvent>,
IDomainEventHandler<CartId, ProductQuantityChangedEvent>
{
private readonly IReadOnlyRepository<Customer.Customer> customerRepository;
private readonly IReadOnlyRepository<Product.Product> productRepository;
private readonly IRepository<Cart.Cart> cartRepository;
private readonly IRepository<Cart.CartItem> cartItemRepository;
public CartUpdater(IReadOnlyRepository<Customer.Customer> customerRepository,
IReadOnlyRepository<Product.Product> productRepository,
IRepository<Cart.Cart> cartRepository,
IRepository<Cart.CartItem> cartItemRepository)
{
this.customerRepository = customerRepository;
this.productRepository = productRepository;
this.cartRepository = cartRepository;
this.cartItemRepository = cartItemRepository;
}
public async Task HandleAsync(CartCreatedEvent @event)
{
var customer = await customerRepository.GetByIdAsync(
@event.CustomerId.IdAsString());
await cartRepository.InsertAsync(new Cart.Cart
{
Id = @event.AggregateId.IdAsString(),
CustomerId = customer.Id,
CustomerName = customer.Name,
TotalItems = 0
});
}
public async Task HandleAsync(ProductAddedEvent @event)
{
var product = await productRepository.GetByIdAsync(@event.ProductId.IdAsString());
var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());
var cartItem = Cart.CartItem.CreateFor(
@event.AggregateId.IdAsString(),
@event.ProductId.IdAsString());
cartItem.ProductName = product.Name;
cartItem.Quantity = @event.Quantity;
cart.TotalItems += @event.Quantity;
await cartRepository.UpdateAsync(cart);
await cartItemRepository.InsertAsync(cartItem);
}
public async Task HandleAsync(ProductQuantityChangedEvent @event)
{
var cartItemId = Cart.CartItem.IdFor(
@event.AggregateId.IdAsString(),
@event.ProductId.IdAsString());
var cartItem = (await cartItemRepository
.FindAllAsync(x => x.Id == cartItemId))
.Single();
var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());
cart.TotalItems += @event.NewQuantity - @event.OldQuantity;
cartItem.Quantity = @event.NewQuantity;
await cartRepository.UpdateAsync(cart);
await cartItemRepository.UpdateAsync(cartItem);
}
}
IRepository
and IReadOnlyRepositoy
interfaces differ from the IRepository
interface
described in Repository paragraph. These interfaces belong to the EventSourcingCQRS.ReadModel
project and they are used to access the read model. The only interaction between
CartUpdater
class and the write model is through the domain events published by
the latter.
Incidentally, the implementation of IRepository
and IReadOnlyRepositoy
uses MongoDB
and it does not have access to the Event Store.
An issue that often arise when using CQRS is the delay between the actions on the
write model and the update of the read model. Since this is just an example, I decide
to solve this problem in a fast way: I registered CartUpdater
class as consumer
of the lightweight publisher described above. In this way I can guarantee that the
read model is updated even before the SaveAsync
method (see Repository paragraph)
is completed.
This is not the right approach to solve the delay issue since it decrease the scalability of the system since it bounds the operations on the domain model to the update of the read model. Nonetheless, if you are working on an existing application and you are introducing these patterns, this could be a feasible way to do it gradually.
Back to the code, who is in charge of coordinating the action on the write model
and the update of the read model is an application service called CartWriter
. See
it below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class CartWriter : ICartWriter
{
private readonly IRepository<Cart, CartId> cartRepository;
private readonly ITransientDomainEventSubscriber subscriber;
private readonly IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers;
private readonly IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers;
private readonly IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers;
public CartWriter(IRepository<Cart, CartId> cartRepository, ITransientDomainEventSubscriber subscriber,
IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers,
IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers,
IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers)
{
this.cartRepository = cartRepository;
this.subscriber = subscriber;
this.cartCreatedEventHandlers = cartCreatedEventHandlers;
this.productAddedEventHandlers = productAddedEventHandlers;
this.productQuantityChangedEventHandlers = productQuantityChangedEventHandlers;
}
public async Task AddProductAsync(string cartId, string productId, int quantity)
{
var cart = await cartRepository.GetByIdAsync(new CartId(cartId));
subscriber.Subscribe<ProductAddedEvent>(
async @event => await HandleAsync(productAddedEventHandlers, @event));
cart.AddProduct(new ProductId(productId), quantity);
await cartRepository.SaveAsync(cart);
}
public async Task ChangeProductQuantityAsync(
string cartId, string productId, int quantity)
{
var cart = await cartRepository.GetByIdAsync(new CartId(cartId));
subscriber.Subscribe<ProductQuantityChangedEvent>(
async @event => await HandleAsync(productQuantityChangedEventHandlers, @event));
cart.ChangeProductQuantity(new ProductId(productId), quantity);
await cartRepository.SaveAsync(cart);
}
public async Task CreateAsync(string customerId)
{
var cart = new Cart(CartId.NewCartId(), new CustomerId(customerId));
subscriber.Subscribe<CartCreatedEvent>(
async @event => await HandleAsync(cartCreatedEventHandlers, @event));
await cartRepository.SaveAsync(cart);
}
public async Task HandleAsync<T>(
IEnumerable<IDomainEventHandler<CartId, T>> handlers, T @event)
where T : IDomainEvent<CartId>
{
foreach (var handler in handlers)
{
await handler.HandleAsync(@event);
}
}
}
CartWriter
service did not know anything about
CartUpdater
class. The latter is just injected ad implementation of IDomainEventHandler
interface. CartWriter
just subscribes all the injected handlers.
The ITransientDomainEventSubscriber
interface is the counterpart of ITransientDomainEventPublisher
interface seen above. Both the interfaces are implemented by the TransientDomainEventPubSub
class, which is charge of dispatching all the published events to the corresponding
subscribers. I called this class transient since it ensures that the subscriptions
stay valid only a given execution context. In this way the handlers registered during
and operation are not invoked in the subsequent ones.
The CartWriter
service seen above is simply the facade of our domain model. For
the sake of symmetry, There is also a facade for the read model, called CartReader
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CartReader : ICartReader
{
private readonly IReadOnlyRepository<Cart> cartRepository;
private readonly IReadOnlyRepository<CartItem> cartItemRepository;
public CartReader(IReadOnlyRepository<Cart> cartRepository,
IReadOnlyRepository<CartItem> cartItemRepository)
{
this.cartRepository = cartRepository;
this.cartItemRepository = cartItemRepository;
}
public async Task<IEnumerable<Cart>> FindAllAsync(
Expression<Func<Cart, bool>> predicate)
{
return await cartRepository.FindAllAsync(predicate);
}
public async Task<Cart> GetByIdAsync(string id)
{
return await cartRepository.GetByIdAsync(id);
}
public async Task<IEnumerable<CartItem>> GetItemsOfAsync(string cartId)
{
return await cartItemRepository.FindAllAsync(x => x.CartId == cartId);
}
}
Summing up, in order to implement the CQRS pattern, and show data to the users, I
created a read model (Cart
and CartItem
) which is synchronized with the domain
model through CartUpdater
and I provided to application services (CartWriter
and CartReader
) which cleanly separate writing operations from reading operations.
The power of this approach is that you can create as many read model as you need, based on the features to be developed. Moreover, if an existing read model comes out to be wrong or inadequate, e.g. due to changed requirements, you can delete and create a new one. The latter can be populated by replaying all the domain events available into the event store.
Conclusions
The post is definitely the longest I’ve written so far, however the subject deserved all the effort and time I dedicated, and much more. The code I produced during the experiment is more than what I could present, and it is available on GitHub.
Event Sourcing is a different paradigm from what normally a developer is used to, and requires to change approach. Once done, it is simple to use. There are aspects that deserve further investigation but those were not in scope: snapshots management, publication of events to other applications, and correction of errors just to name the most important.
CQRS becomes essential if you want to use Event Sourcing, but it can also be used alone. To use its full potential, it is necessary to correctly manage possible delays in updating the read model. For this reason, its adoption must be evaluated case-by-case, however it is very flexible and the potentials I have experienced are certainly interesting.