Come promesso al termine del post precedente, in questo articolo approfondirò gli aspetti pratici legati al DDD ed in particolare ai pattern CQRS ed Event Sourcing.
L’obiettivo principale dell’esperimento è quello di implementare un aggregato secondo il paradigma Event Sourcing e di creare un modello di lettura separato per alimentare le pagine di un’applicazione Web.
Prima di presentare l’esempio pratico farò una breve introduzione dei principali pattern architetturali che sono stati utilizzati dalla nascita del DDD.
Fare propria la mentalità necessaria per utilizzare al meglio CQRS e Event Sourcing non è stato facile. Devo però dire che alla fine ci sono riuscito grazie alle fondamenta gettate dal corso di Avanscoperta, da una massiccia dose di letture (il libro rosso di Vaugh Vernon e molti post), e da un numero adeguato di tentativi sempre supportati da test.
Il codice completo da cui sono presi gli esempi utilizzati nel resto del post è disponibile su GitHub.
Il viaggio continua. Nel frattempo faccio tesoro delle esperienze fatte e mi godo la vista del panorama all’orizzonte.
Evoluzione architetture
Il libro blu di Evans è stato pubblicato nel 2003 e da allora hanno fatto la loro comparsa diversi stili architetturali. Durante questa evoluzione le linee guida del DDD sono rimaste sempre valide al netto di alcuni tecnicismi necessari per l’implementazione delle varie architetture.
Layered architecture
L’architettura originale utilizzata da Evans per isolare la logica di dominio dalle altre responsabilità di un’applicazione è la layered architecture. I layer standard in cui suddividere un’applicazione sono:
- UI, è lo strato responsabile di visualizzare le informazioni all’utente e interpretare i comandi di quest’ultimo;
- Applicazione, definisce gli scenari di utilizzo dell’applicazione stessa. Coordina gli oggetti di dominio e si occupa di gestire tutte le attività necessarie al corretto funzionamento dell’applicazione. Ad esempio, sicurezza, transazioni, etc.;
- Dominio, contiene tutto ciò che riguarda la logica di dominio. Lo stato degli oggetti di dominio è gestito da questo strato anche se la persistenza degli oggetti stessi è delegata all’infrastruttura. Questo è lo strato dove vengono applicate le linee guida del DDD;
- Infrastruttura, non contiene né logica di dominio né logica applicativa, ma fornisce le implementazioni tecniche che servono agli altri strati per funzionare. Ad esempio, persistenza, gestione delle transazioni, etc.
La regola fondamentale di questa architettura è che ogni layer può dipendere soltanto da quelli sottostanti. L’unico modo per gli strati sottostanti di comunicare con quelli superiori è tramite l’utilizzo di pattern tipo Observer e Mediator.
La separazione netta del modello di dominio in uno strato distinto dagli altri consente di sviluppare e testare le regole di business in totale autonomia rispetto al resto dell’applicazione.
Unico neo di questo approccio è dato dalla dipendenza, seppur mitigata dall’uso di interfacce chiare e ben definite, dello strato di dominio da quello di infrastruttura.
Hexagonal architecture
La Hexagonal architecture di Alistair Cockburn, risolve i problemi dell’architettura precedente e introduce un modo più simmetrico di pensare all’applicazione. Sostanzialmente in questa architettura non c’è più un sopra e un sotto ma soltanto l’interno e l’esterno.
L’interno è costituito da quelli che prima erano lo strato applicativo e quello di dominio, definiti semplicemente “applicazione” da Cockburn. Ovvero tutto quello che implementa i casi d’uso di interesse del business. L’interno comunica con l’esterno tramite “porte” definite dall’applicazione stessa. Esempi di porte possono essere la API pubblica dell’applicazione, l’interfaccia di accesso ai dati e quella di pubblicazione degli eventi di dominio.
L’esterno è costituito da tutto ciò che interagisce con l’applicazione o, vice versa, con cui quest’ultima interagisce. L’interazione avviene sempre tramite adattatori i quali o adattano i segnali esterni alla API dell’applicazione o implementano le interfacce necessarie al corretto funzionamento di quest’ultima. Esempi di adattatori sono i controller che interpretano le richieste HTTP ed invocano le API dell’applicazione o l’implementazione SQL delle interfacce di accesso ai dati.
Quella che per Cockburn è l’applicazione in DDD viene comunemente suddivisa in due livelli concentrici:
- quello più interno costituito dal modello di dominio che implementa le regole di business;
- e quello più esterno costituito dagli application service i quali definiscono gli scenari di utilizzo.
Questa architettura consente di sviluppare l’applicazione in totale autonomia rispetto a tutto ciò che le sta intorno (UI, database, etc.), consentendo quindi di eseguirla, e testarla, in varie configurazioni: con UI, da riga di comando, etc.
La Hexagonal architecture è ampiamente adottata e sta alla base di tutte le architetture venute dopo, compresi i pattern descritti di seguito.
CQRS
Negli approcci tradizionali, alcuni descritti sopra, viene utilizzato un unico modello dati, e i relativi servizi, sia per le operazioni di scrittura che per quelle di lettura. Questo porta spesso alla creazione di modelli non ottimali e che espongono troppe informazioni. Inoltre, questo può portare ad un eccessivo accoppiamento fra il modello e il codice client che lo utilizza.
Il Command Query Responsibility Segregation (o per gli amici CQRS) è un pattern architetturale il quale, come dice il nome stesso, prevede di separare totalmente la responsabilità di modificare i dati (Command) da quella di leggerli (Query). La formalizzazione di questo approccio viene generalmente attribuita a Greg Young.
L’utilizzo di due modelli totalmente distinti per lo operazioni di scrittura e per quelle di lettura, previsto dal CQRS, consente invece di progettare e ottimizzare al meglio ogni modello per le operazioni di sua competenza. Oltre a questo, l’utilizzo di modelli distinti consente anche di selezionare le tecnologie più adeguate per le due esigenze. Ad esempio potremmo decidere di utilizzare meccanismi di persistenza diversi perché in lettura potrebbe essere più adeguato utilizzare un DB relazionale mentre in scrittura potremmo preferire un NoSQL.
Una volta separati i modelli di lettura e scrittura, possiamo anche scalare diversamente l’infrastruttura necessaria a supportarli. Spesso succede che il numero di scritture in un sistema è molto inferiore alle letture. Per cui, utilizzando modelli e tecnologie separate, nulla ci vieta di scalare di più l’infrastruttura di lettura rispetto a quella di scrittura.
Ovviamente i due modelli devono essere sincronizzati per garantire che le informazioni lette siano consistenti con quelle scritte. La consistenza non è detto che sia immediata ma deve comunque essere garantito che sarà raggiunta. In DDD la sincronizzazione avviene attraverso i Domain Event generati dagli aggregati.
L’uso dei Domain Event per sincronizzare il modello di lettura con quello di scrittura ha un solo punto debole: generalmente l’emissione di un evento e la persistenza non sono transazionali. Questo è dovuto dal fatto che il sistema di gestione dei messaggi e quello per la persistenza sono sistemi distinti (e.g. RabbitMQ e SQL Server).
Event sourcing
Event sourcing (ES) permette di affrontare il problema appena descritto in modo estremamente elegante. Vediamo come.
Normalmente per persistere un aggregato, la fotografia in un dato momento del suo stato viene salvata su un database. Utilizzando ES invece, ciò che viene salvato non è lo stato attuale dell’aggregato ma la sequenza dei Domain Event che hanno portato l’aggregato nel suo stato attuale. Per ricaricare un aggregato è sufficiente leggere tutti gli eventi ad esso associati e ricalcolare il suo stato a partire dagli eventi stessi.
In questo modo ogni volta che viene modificato un aggregato non si aggiorna un record esistente ma se ne appende uno nuovo al flusso degli eventi che lo rappresentano. Questo approccio è molto efficiente perché si annulla il rischio di lock concorrenti sui record di una tabella e, di conseguenza, le possibilità di deadlock fra thread di scrittura diversi.
Questo tipo di persistenza prende il nome di event store. È possibile crearsi il proprio event store come descritto nel libro di Vaughn Vernon, utilizzando un database relazionale, oppure, come vedremo in seguito, si può utilizzare un prodotto esistente che fornisce questa funzionalità specifica.
Tornando al problema della non transanzionalità accennato in precedenza, utilizzando un event store si ha un unico posto in cui vengono salvati i Domain Event e, implicitamente, lo stato degli aggregati. È quindi sufficiente leggere gli eventi da pubblicare dall’event store per evitare rischi di inconsistenza tra il modello di scrittura e quello di lettura.
L’applicazione
Passiamo alla pratica. Per approfondire i pattern architetturali brevemente descritti sopra e sperimentarli praticamente, ho deciso di sviluppare un’applicazione di prova con l’obiettivo di implementare un aggregato secondo il paradigma Event Sourcing e di creare un modello di lettura separato che potesse essere utilizzato per alimentare le pagine dell’applicazione Web.
Inoltre, ho voluto utilizzare Event Store per la parte di persistenza degli eventi. Come dice il nome stesso Event Store è un event store sviluppato, tra gli altri, da Greg Young.
L’applicazione consente di creare dei carrelli e di aggiungerci dei prodotti specificando la loro quantità. I casi d’uso sono molto semplici:
- deve essere possibile creare un carrello fornendo il cliente a cui appartiene;
- deve essere possibile aggiungere un prodotto al carrello specificando la quantità a patto che il prodotto non sia già presente nel carrello;
- deve essere possibile modificare la quantità di un prodotto già presente in un carrello;
- infine per ogni prodotto non deve essere possibile aggiungere più di 50 unità.
L’applicazione è sviluppata con ASP.NET Core, C# e Docker.
Modellare un aggregato secondo Event Sourcing
Come prima cosa, per affrontare il problema descritto sopra, sono partito dalla modellazione
dell’aggregato carrello (Cart
). Seguendo un approccio classico, ho iniziato a
pensare alle proprietà delle classi che avrei dovuto persistere sul database utilizzando
Entity Framework ma, dopo un primo momento di indecisione, ho cambiato strategia.
Ho quindi iniziato a pensare agli eventi di dominio che l’aggregato avrebbe dovuto generare nei vari casi d’uso descritti sopra e sono partito dalla creazione di un nuovo carello.
1 |
[Fact] public void GivenNoCartExistsWhenCreateOneThenCartCreatedEvent() { var cart = new Cart(DefaultCartId, DefaultCustomerId); AssertSingleUncommittedEvent<CartCreatedEvent>(cart, @event => { Assert.Equal(DefaultCartId, @event.AggregateId); Assert.Equal(DefaultCustomerId, @event.CustomerId); }); } |
Come si può capire dal test, quando creo una nuova istanza di carrello, passandogli
il proprio identificativo e quello del cliente a cui è associato, mi aspetto che
sia stato emesso un solo evento di tipo CartCreatedEvent
e che l’evento contenga
le informazioni corrette. Il metodo AssertSingleUncommittedEvent
è un metodo di
utilità della classe di test per verificare gli eventi generati dall’aggregato e
non ancora persistiti.
Seguendo il paradigma Event Sourcing ogni azione eseguita su un aggregato, anche la sua creazione, si divide in tre passi concettuali:
- la verifica dei parametri di ingresso e dello stato dell’aggregato al fine di verificare la fattibilità dell’azione stessa;
- in caso positivo dei controlli precedenti, l’emissione degli eventi scatenati dall’azione;
- l’aggiornamento dello stato dell’aggregato in funzione degli eventi di cui sopra.
La separazione dei passi 2 e 3, come vedremo in seguito, è necessaria quando dobbiamo ricreare un oggetto a partire dagli eventi presenti sull’event store. Con queste linee guida, per soddisfare il test sopra, ho creato la classe seguente.
1 |
public class Cart : AggregateBase<CartId> { private CustomerId CustomerId { get; set; } public Cart(CartId cartId, CustomerId customerId) : this() { if (cartId == null) throw new ArgumentNullException(nameof(cartId)); if (customerId == null) throw new ArgumentNullException(nameof(customerId)); RaiseEvent(new CartCreatedEvent(cartId, customerId)); } internal void Apply(CartCreatedEvent ev) { Id = ev.AggregateId; CustomerId = ev.CustomerId; } } |
Il codice sopra rispecchia esattamente i tre passi descritti in precedenza: controllo
(righe 7 e 8), emissione eventi (riga 9) e aggiornamento dello stato (metodo Apply
).
Al fine di rendere più semplice la lettura della classe Cart
e evitare ripetizioni
del codice ho creato la classe base AggregateBase
che sostanzialmente agisce da
Layer Supertype per gli
aggregati del dominio. La classe base è descritta più in dettaglio successivamente.
In modo del tutto analogo a quanto fatto per la creazione di un nuovo carrello, per
implementare il caso d’uso dell’aggiunta di un prodotto al carrello sono partito
dalla definizione del test.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Fact]
public void GivenACartWhenAddAProductThenProductAddedEvent()
{
var cart = new Cart(DefaultCartId, DefaultCustomerId);
ClearUncommittedEvents(cart);
cart.AddProduct(DefaultProductId, 2);
AssertSingleUncommittedEvent<ProductAddedEvent>(cart, @event =>
{
Assert.Equal(DefaultProductId, @event.ProductId);
Assert.Equal(2, @event.Quantity);
Assert.Equal(DefaultCartId, @event.AggregateId);
Assert.Equal(0, @event.AggregateVersion);
});
}
ClearUncommittedEvents
per svuotare la lista degli eventi ancora da salvare. A
parte questo dettaglio, anche in questo caso la lettura del test è molto semplice:
quando aggiungo un prodotto al carrello mi aspetto che venga emesso l’evento corrispondente
contenente i dati necessari a descrivere l’operazione appena avvenuta.
È fondamentale notare che non ho fatto alcuna asserzione sullo stato (proprietà o
campi) della classe Cart
. Infatti in questo caso non mi interessa sapere come la
classe gestisce internamente il suo stato, ma semplicemente voglio accertarmi che
vengano emessi gli eventi corretti. Sono quest’ultimi infatti che mi consentiranno
in seguito di costruire un modello di lettura per la mia applicazione web.
Questa è la perfetta applicazione del principio del Cavaliere Nero, noto ai più come “incapsulamento”. Non potrò mai ringraziare abbastanza Ziobrando per aver trovato una metafora così esplicativa per un concetto così semplice e spesso sottovalutato.
Lo stato dell’aggregato è fondamentale per il corretto funzionamento di quest’ultimo.
Per accertarmi che l’aggregato gestisse correttamente lo stato è stato sufficiente
aggiungere il seguente test.
1
2
3
4
5
6
7
8
9
10
11
[Fact]
public void GivenACartWithAProductWhenAddingTheSameProductThenThrowsCartException()
{
var cart = new Cart(DefaultCartId, DefaultCustomerId);
cart.AddProduct(DefaultProductId, 2);
ClearUncommittedEvents(cart);
Assert.Throws<CartException>(() => { cart.AddProduct(DefaultProductId, 1); });
Assert.Empty(GetUncommittedEventsOf(cart));
}
Cart
non gestisse correttamente il proprio stato non avrebbe modo di controllare
se il prodotto è già presente nel carrello o meno.
Dati i test sopra, l’implementazione del comportamento atteso è stata la seguente:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Cart : AggregateBase<CartId>
{
private List<CartItem> Items { get; set; }
public void AddProduct(ProductId productId, int quantity)
{
if (productId == null)
{
throw new ArgumentNullException(nameof(productId));
}
if (ContainsProduct(productId))
{
throw new CartException($"Product {productId} already added");
}
RaiseEvent(new ProductAddedEvent(productId, quantity));
}
internal void Apply(ProductAddedEvent ev)
{
Items.Add(new CartItem(ev.ProductId, ev.Quantity));
}
private bool ContainsProduct(ProductId productId)
{
return Items.Any(x => x.ProductId == productId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class CartItem
{
public CartItem(ProductId productId, int quantity)
{
ProductId = productId;
Quantity = quantity;
}
public ProductId ProductId { get; }
public int Quantity { get; }
}
Dato che l’articolo è già piuttosto denso di concetti eviterò di descrivere l’implementazione degli altri casi d’uso e lascio a chi è interessato la consultazione del codice su GitHub.
AggregateBase
AggregateBase
implementa due interfacce basilari. La prima è IAggregate
la quale
stabilisce che ogni aggregato debba avere un identificativo.
1
2
3
4
public interface IAggregate<TId>
{
TId Id { get; }
}
La seconda interfaccia invece definisce le firme dei metodi necessari per funzionare
correttamente con il paradigma Event Sourcing. Ogni aggregato deve avere una versione
(Version
) per poter gestire eventuali conflitti di scrittura. Deve essere inoltre
possibile applicare un evento di dominio (ApplyEvent
), utile per caricare un aggregato
dall’event store. Deve essere infine possibile ottenere l’elenco degli eventi non
ancora salvati sull’event store (GetUncommittedEvents
) e svuotarlo (ClearUncommittedEvents
).
1
2
3
4
5
6
7
internal interface IEventSourcingAggregate<TAggregateId>
{
long Version { get; }
void ApplyEvent(IDomainEvent<TAggregateId> @event, long version);
IEnumerable<IDomainEvent<TAggregateId>> GetUncommittedEvents();
void ClearUncommittedEvents();
}
internal
perché sia visibile solo internamente all’assembly
che contiene gli oggetti di dominio. Questo è un dettaglio implementativo che non
è di interesse per chi utilizza gli aggregati.
La classe AggregateBase
implementa le interfacce sopra in modo abbastanza lineare.
Le implementazioni che meritano un approfondimento sono due: ApplyEvent
e RaiseEvent
.
ApplyEvent - per evitare duplicazioni, il metodo ApplyEvent
, verifica che l’evento
da applicare non sia fra quelli ancora da salvare. Nel caso in cui l’evento sia effettivamente
da applicare, il costrutto a riga 18 consente di invocare il metodo specifico per
applicare l’evento di dominio.
Nel caso della classe Cart
e dell’evento CartCreatedEvent
, l’uso della parola
chiave dynamic
consente alla classe AggregateBase
di invocare dinamicamente a
runtime il metodo internal void Apply(CartCreatedEvent ev)
della classe Cart
.
Questo benchmark
mostra come l’invocazione dinamica dei metodi, pur essendo più lenta rispetto all’uso
di switch
e pattern matching, sia comunque molto più veloce rispetto ad altre possibili
implementazioni.
RaiseEvent - questo metodo garantisce di assegnare la versione e l’identificativo
dell’aggregato corretti all’evento che viene emesso, semplificando così il codice
della classe Cart
. Inoltre, prima di accodare l’evento emesso a quelli da salvare,
applica l’evento stesso all’aggregato in modo che lo stato di quest’ultimo sia consistente
con gli eventi emessi.
1 |
public abstract class AggregateBase<TId> : IAggregate<TId>, IEventSourcingAggregate<TId> { public const long NewAggregateVersion = -1; private readonly ICollection<IDomainEvent<TId>> _uncommittedEvents = new LinkedList<IDomainEvent<TId>>(); private long _version = NewAggregateVersion; public TId Id { get; protected set; } long IEventSourcingAggregate<TId>.Version => _version; void IEventSourcingAggregate<TId>.ApplyEvent(IDomainEvent<TId> @event, long version) { if (!_uncommittedEvents.Any(x => Equals(x.EventId, @event.EventId))) { ((dynamic)this).Apply((dynamic)@event); _version = version; } } void IEventSourcingAggregate<TId>.ClearUncommittedEvents() => _uncommittedEvents.Clear(); IEnumerable<IDomainEvent<TId>> IEventSourcingAggregate<TId>.GetUncommittedEvents() => _uncommittedEvents.AsEnumerable(); protected void RaiseEvent<TEvent>(TEvent @event) where TEvent: DomainEventBase<TId> { IDomainEvent<TId> eventWithAggregate = @event.WithAggregate( Equals(Id, default(TId)) ? @event.AggregateId : Id, _version); ((IEventSourcingAggregate<TId>)this).ApplyEvent(eventWithAggregate, _version + 1); _uncommittedEvents.Add(@event); } } |
Persistenza
Seguendo un approccio classico al DDD, per poter accedere agli aggregati e per persisterli
definirei un’interfaccia IRepository
nel domain model con le classiche operazioni
GetByID
, Remove
, Save
ed eventuali metodi di ricerca più o meno specializzati.
Dopodiché implementerei tale interfaccia in qualche assembly di supporto. Tale implementazione
sarebbe dipendente a qualche meccanismo di persistenza e tale dipendenza sarebbe
sicuramente esplicitata dal nome dell’implementazione stessa, e.g. EntityFrameworkRepository
.
Utilizzando Event Sourcing la definizione dell’interfaccia, e la relativa implementazione,
assumono una forma diversa. Prima di tutto l’interfaccia IRepository
è più semplice.
1
2
3
4
5
6
7
public interface IRepository<TAggregate, TAggregateId>
where TAggregate: IAggregate<TAggregateId>
{
Task<TAggregate> GetByIdAsync(TAggregateId id);
Task SaveAsync(TAggregate aggregate);
}
Per quanto riguarda l’implementazione invece, il repository deve essere a conoscenza
almeno dei metodi dell’interfaccia IEventSourcingAggregate
per poter accedere agli
eventi di dominio emessi dall’aggregato e per poter passare gli eventi recuperati
dall’event store. Per questo motivo è più naturale mettere parte dell’implementazione
del repository nello stesso assembly che contiene il domain model.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class EventSourcingRepository<TAggregate, TAggregateId> :
IRepository<TAggregate, TAggregateId>
where TAggregate : AggregateBase<TAggregateId>, IAggregate<TAggregateId>
where TAggregateId : IAggregateId
{
private readonly IEventStore eventStore;
private readonly ITransientDomainEventPublisher publisher;
public EventSourcingRepository(IEventStore eventStore,
ITransientDomainEventPublisher publisher)
{
this.eventStore = eventStore;
this.publisher = publisher;
}
public async Task<TAggregate> GetByIdAsync(TAggregateId id)
{
try
{
var aggregate = CreateEmptyAggregate();
IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;
foreach (var @event in await eventStore.ReadEventsAsync(id))
{
aggregatePersistence.ApplyEvent(@event.DomainEvent, @event.EventNumber);
}
return aggregate;
}
catch (EventStoreAggregateNotFoundException)
{
return null;
}
catch (EventStoreCommunicationException ex)
{
throw new RepositoryException("Unable to access persistence layer", ex);
}
}
public async Task SaveAsync(TAggregate aggregate)
{
try
{
IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;
foreach (var @event in aggregatePersistence.GetUncommittedEvents())
{
await eventStore.AppendEventAsync(@event);
await publisher.PublishAsync((dynamic)@event);
}
aggregatePersistence.ClearUncommittedEvents();
}
catch (EventStoreCommunicationException ex)
{
throw new RepositoryException("Unable to access persistence layer", ex);
}
}
private TAggregate CreateEmptyAggregate()
{
return (TAggregate)typeof(TAggregate)
.GetConstructor(
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public,
null, new Type[0], new ParameterModifier[0])
.Invoke(new object[0]);
}
}
La parte dell’implementazione del repository che resta fuori dal domain model è quella
relativa allo specifico event store utilizzato. A questo scopo ho introdotto l’interfaccia
IEventStore
. L’implementazione di tale interfaccia è spiegata più avanti.
1
2
3
4
5
6
7
8
public interface IEventStore
{
Task<IEnumerable<Event<TAggregateId>>> ReadEventsAsync<TAggregateId>(TAggregateId id)
where TAggregateId: IAggregateId;
Task<AppendResult> AppendEventAsync<TAggregateId>(IDomainEvent<TAggregateId> @event)
where TAggregateId: IAggregateId;
}
Tornando alla classe EventSourcingRepository
, si vede come il metodo GetByIdAsync
applichi tutti gli eventi recuperati dall’event store ad un’istanza vuota dell’aggregato
tramite il metodo ApplyEvent
. L’istanza vuota dell’aggregato viene creata utilizzando
il metodo CreateEmptyAggregate
. Tale metodo fa uso della reflection di C# per creare
un aggregato utilizzando il costruttore di default. Ho scelto di utilizzare la reflection
perché voglio che gli aggregati possano dichiarare il costruttore di default privato,
come nel caso della classe Cart
. Questo perché il fatto di esporre o meno il costruttore
di default è una scelta progettuale dell’aggregato, e non deve essere un’imposizione
dovuta alle scelte architetturali.
Per quanto riguarda il metodo SaveAsync
, si occupa di recuperare gli eventi non
ancora salvati dall’aggregato, di salvarli sull’event store e, se tutto va bene,
di rimuoverli dall’aggregato.
Ho scelto di utilizzare questo metodo anche per pubblicare gli eventi salvati internamente
all’applicazione tramite l’interfaccia ITransientDomainEventPublisher
. La classe
che implementa questa interfaccia non è altro che un lightweight publisher come
descritto nel libro rosso di Vaugh Vernon
al capitolo 8 “Domain Events”. A differenza di quanto spiegato nel libro, nella mia
applicazione di esempio gli eventi vengono pubblicati dal repository invece che dagli
aggregati. Questo per due motivi: il codice degli aggregati risulta più semplice
e il metodo SaveAsync
è l’unico in cui siamo sicuri che gli eventi sono stati veramente
salvati e che non ci sono stati conflitti di alcun genere.
Event Store
Per questo esperimento ho deciso di implementare la persistenza del modello Event
Sourcing utilizzando Event Store. Per utilizzarlo ho incluso
l’immagine Docker eventstore/eventstore
nel file docker-compose.yml
del mio progetto.
Ho utilizzato la configurazione di base perché per il mio esperimento ho avuto esigenze
particolari.
Per dialogare con il server Event Store ho installato nella mia soluzione il pacchetto
NuGet EventStore.ClientAPI.NetCore
, versione 4.0.3-rc. Non avendo mai utilizzato
questo prodotto ho scritto alcuni test per verificare il comportamento della libreria
e del server. Dopodiché ho implementato l’interfaccia IEventStore
descritta sopra.
Event Store offre molte funzionalità interessanti come le proiezioni. Per poterle utilizzare è necessario memorizzare gli eventi in formato JSON. Dato che altri event store potrebbero fare affidamento a formati diversi, ho deciso di implementare la serializzazione e deserializzazione degli eventi a questo livello.
Per essere più conciso non riporto qui l’implementazione che comunque è disponibile sul repository GitHub.
CQRS e modello di lettura
Come già detto più volte in precedenza, utilizzando Event Sourcing per implementare la logica di business non abbiamo più un modello di lettura da utilizzare per visualizzare i dati all’utente. Per risolvere questo problema viene in nostro soccorso CQRS.
Seguendo le linee guida di questo pattern ho creato un semplice modello si lettura
che si adattasse all’interfaccia web che volevo realizzare.
1
2
3
4
5
6
7
public class Cart : IReadEntity
{
public string Id { get; set; }
public int TotalItems { get; set; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CartItem : IReadEntity
{
public string Id { get; private set; }
public string CartId { get; set; }
public string ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public static CartItem CreateFor(string cartId, string productId)
{
return new CartItem
{
Id = IdFor(cartId, productId),
CartId = cartId,
ProductId = productId
};
}
public static string IdFor(string cartId, string productId)
{
return $"{productId}@{cartId}";
}
}
Questo modello deve essere tenuto sincronizzato con il modello di scrittura, e per
questo motivo ho creato la classe CartUpdater
la quale, gestendo gli eventi generati
dal modello di scrittura, si occupasse di aggiornare il modello di lettura.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class CartUpdater : IDomainEventHandler<CartId, CartCreatedEvent>,
IDomainEventHandler<CartId, ProductAddedEvent>,
IDomainEventHandler<CartId, ProductQuantityChangedEvent>
{
private readonly IReadOnlyRepository<Customer.Customer> customerRepository;
private readonly IReadOnlyRepository<Product.Product> productRepository;
private readonly IRepository<Cart.Cart> cartRepository;
private readonly IRepository<Cart.CartItem> cartItemRepository;
public CartUpdater(IReadOnlyRepository<Customer.Customer> customerRepository,
IReadOnlyRepository<Product.Product> productRepository,
IRepository<Cart.Cart> cartRepository,
IRepository<Cart.CartItem> cartItemRepository)
{
this.customerRepository = customerRepository;
this.productRepository = productRepository;
this.cartRepository = cartRepository;
this.cartItemRepository = cartItemRepository;
}
public async Task HandleAsync(CartCreatedEvent @event)
{
var customer = await customerRepository.GetByIdAsync(
@event.CustomerId.IdAsString());
await cartRepository.InsertAsync(new Cart.Cart
{
Id = @event.AggregateId.IdAsString(),
CustomerId = customer.Id,
CustomerName = customer.Name,
TotalItems = 0
});
}
public async Task HandleAsync(ProductAddedEvent @event)
{
var product = await productRepository.GetByIdAsync(@event.ProductId.IdAsString());
var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());
var cartItem = Cart.CartItem.CreateFor(
@event.AggregateId.IdAsString(),
@event.ProductId.IdAsString());
cartItem.ProductName = product.Name;
cartItem.Quantity = @event.Quantity;
cart.TotalItems += @event.Quantity;
await cartRepository.UpdateAsync(cart);
await cartItemRepository.InsertAsync(cartItem);
}
public async Task HandleAsync(ProductQuantityChangedEvent @event)
{
var cartItemId = Cart.CartItem.IdFor(
@event.AggregateId.IdAsString(),
@event.ProductId.IdAsString());
var cartItem = (await cartItemRepository
.FindAllAsync(x => x.Id == cartItemId))
.Single();
var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());
cart.TotalItems += @event.NewQuantity - @event.OldQuantity;
cartItem.Quantity = @event.NewQuantity;
await cartRepository.UpdateAsync(cart);
await cartItemRepository.UpdateAsync(cartItem);
}
}
IRepository
e IReadOnlyRepositoy
non hanno niente a che vedere
con l’interfaccia IRepository
descritta nel paragrafo Repository. Queste interfacce
infatti appartengono al progetto EventSourcingCQRS.ReadModel
e sono quelle utilizzate
per accedere al modello di lettura. L’unica interazione fra la classe CartUpdater
e il modello di scrittura avviene esclusivamente tramite gli eventi di dominio generati
da quest’ultimo.
Per inciso, l’implmentazione di IRepository
e IReadOnlyRepositoy
utilizzata in
questa applicazione di esempio utilizza MongoDB e niente
ha a che vedere con Event Store.
Un problema che spesso si ha quando si vuole utilizzare CQRS è il possibile ritardo
che si può avere nell’aggiornamento del modello di lettura a seguito di azioni su
quello di scrittura. Data la natura esemplificativa di questa applicazione ho deciso
di risolvere questo problema molto semplicemente: ho registrato CartUpdater
come
consumatore del lightweight publisher descritto sopra. Così facendo ho la garanzia
che l’aggiornamento del modello di lettura sia completato prima che il metodo SaveAsync
(vedi paragrafo Repository) sia concluso.
Questo non è certamente il modo corretto per gestire questo problema dato che riduce la scalabilità del sistema legando le operazioni sul modello di dominio all’aggiornamento del modello di lettura. Ciononostante, se si sta lavorando su un applicazione esistente e si vuole introdurre gradualmente questi pattern, questa è sicuramente una strada percorribile.
Tornando al codice, chi si occupa di coordinare le azioni sul modello di scrittura
con l’aggiornamento del modello di lettura è il servizio applicativo CartWriter
riportato qui sotto.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class CartWriter : ICartWriter
{
private readonly IRepository<Cart, CartId> cartRepository;
private readonly ITransientDomainEventSubscriber subscriber;
private readonly IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers;
private readonly IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers;
private readonly IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers;
public CartWriter(IRepository<Cart, CartId> cartRepository, ITransientDomainEventSubscriber subscriber,
IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers,
IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers,
IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers)
{
this.cartRepository = cartRepository;
this.subscriber = subscriber;
this.cartCreatedEventHandlers = cartCreatedEventHandlers;
this.productAddedEventHandlers = productAddedEventHandlers;
this.productQuantityChangedEventHandlers = productQuantityChangedEventHandlers;
}
public async Task AddProductAsync(string cartId, string productId, int quantity)
{
var cart = await cartRepository.GetByIdAsync(new CartId(cartId));
subscriber.Subscribe<ProductAddedEvent>(
async @event => await HandleAsync(productAddedEventHandlers, @event));
cart.AddProduct(new ProductId(productId), quantity);
await cartRepository.SaveAsync(cart);
}
public async Task ChangeProductQuantityAsync(
string cartId, string productId, int quantity)
{
var cart = await cartRepository.GetByIdAsync(new CartId(cartId));
subscriber.Subscribe<ProductQuantityChangedEvent>(
async @event => await HandleAsync(productQuantityChangedEventHandlers, @event));
cart.ChangeProductQuantity(new ProductId(productId), quantity);
await cartRepository.SaveAsync(cart);
}
public async Task CreateAsync(string customerId)
{
var cart = new Cart(CartId.NewCartId(), new CustomerId(customerId));
subscriber.Subscribe<CartCreatedEvent>(
async @event => await HandleAsync(cartCreatedEventHandlers, @event));
await cartRepository.SaveAsync(cart);
}
public async Task HandleAsync<T>(
IEnumerable<IDomainEventHandler<CartId, T>> handlers, T @event)
where T : IDomainEvent<CartId>
{
foreach (var handler in handlers)
{
await handler.HandleAsync(@event);
}
}
}
CartWriter
non conosce direttamente
la classe CartUpdater
. Quest’ultima viene iniettata come implementazione dell’interfaccia
IDomainEventHandler
. CartWriter
si occupa di registrare tutti gli handler per
la gestione degli eventi di dominio.
L’interfaccia ITransientDomainEventSubscriber
è la controparte dell’interfaccia
ITransientDomainEventPublisher
vista in precedenza. Entrambe le interfacce in realtà
sono implementate dalla classe TransientDomainEventPubSub
la quale si occupa di
recapitare a tutti i consumatori registrati gli eventi di dominio emessi. Questa
classe si chiama transient perché garantisce che le sottoscrizioni siano valide
solamente all’interno di un singolo contesto di esecuzione. Questo fa sì che gli
handler registrati durante un’operazione non vengano invocati dalle successive
operazioni.
Il servizio CartWriter
visto in precedenza non è altro che la facade del nostro
modello di scrittura. Per simmetria esiste una facade per il modello di lettura la
quale, senza troppa fantasia, si chiama CartReader
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CartReader : ICartReader
{
private readonly IReadOnlyRepository<Cart> cartRepository;
private readonly IReadOnlyRepository<CartItem> cartItemRepository;
public CartReader(IReadOnlyRepository<Cart> cartRepository,
IReadOnlyRepository<CartItem> cartItemRepository)
{
this.cartRepository = cartRepository;
this.cartItemRepository = cartItemRepository;
}
public async Task<IEnumerable<Cart>> FindAllAsync(
Expression<Func<Cart, bool>> predicate)
{
return await cartRepository.FindAllAsync(predicate);
}
public async Task<Cart> GetByIdAsync(string id)
{
return await cartRepository.GetByIdAsync(id);
}
public async Task<IEnumerable<CartItem>> GetItemsOfAsync(string cartId)
{
return await cartItemRepository.FindAllAsync(x => x.CartId == cartId);
}
}
Riassumendo, per implementare il pattern CQRS, e poter visualizzare i dati all’utente,
ho creato un modello di lettura (Cart
e CartItem
) che tengo sincronizzato con
il modello di dominio tramite CartUpdater
e ho fornito due servizi applicativi
(CartWriter
e CartReader
) i quali separano nettamente le operazioni di scrittura
da quelle di lettura.
La potenza di questo approccio sta nel fatto che posso creare quanti modelli di lettura voglio, in funzione delle caratteristiche che devo sviluppare. Oltre a questo, nel caso in cui un modello di lettura esistente non sia più adeguato, e.g. perché sono cambiati i requisiti, posso eliminarlo e crearne uno nuovo. Il nuovo modello verrebbe popolato andando a rieseguire gli eventi di dominio passati presenti nell’event store.
Conclusioni
Il post è sicuramente il più lungo che ho scritto fino ad oggi ma l’argomento meritava tutto lo spazio e il tempo che gli ho dedicato e anche molto di più. Il codice che ho prodotto durante l’esperimento è più di quello che sono riuscito a presentare ed è disponibile su GitHub.
Event Sourcing è un paradigma diverso da quello con cui uno sviluppatore è normalmente abituato a pensare e richiede un certo cambio di mentalità. Una volta fatto proprio risulta molto semplice. Ci sono aspetti che avrebbero meritato ulteriori approfondimenti ma che non hanno trovato spazio in questo post: gestione degli snapshot, pubblicazione degli eventi ad altre applicazioni e correzione degli errori solo per nominare quelle più importanti.
CQRS diventa indispensabile se si vuole usare Event Sourcing ma può essere utilizzato anche da solo. Per utilizzarlo al pieno delle sue possibilità è necessario gestire correttamente i possibili ritardi nell’aggiornamento del modello di lettura. Per questo motivo la sua adozione deve essere valutata caso per caso, ma è molto flessibile e le potenzialità che ho sperimentato sono sicuramente interessanti.