Come promesso al termine del post precedente, in questo articolo approfondirò gli aspetti pratici legati al DDD ed in particolare ai pattern CQRS ed Event Sourcing.

L’obiettivo principale dell’esperimento è quello di implementare un aggregato secondo il paradigma Event Sourcing e di creare un modello di lettura separato per alimentare le pagine di un’applicazione Web.

Prima di presentare l’esempio pratico farò una breve introduzione dei principali pattern architetturali che sono stati utilizzati dalla nascita del DDD.

Fare propria la mentalità necessaria per utilizzare al meglio CQRS e Event Sourcing non è stato facile. Devo però dire che alla fine ci sono riuscito grazie alle fondamenta gettate dal corso di Avanscoperta, da una massiccia dose di letture (il libro rosso di Vaugh Vernon e molti post), e da un numero adeguato di tentativi sempre supportati da test.

Il codice completo da cui sono presi gli esempi utilizzati nel resto del post è disponibile su GitHub.

Il viaggio continua. Nel frattempo faccio tesoro delle esperienze fatte e mi godo la vista del panorama all’orizzonte.

Evoluzione architetture

Il libro blu di Evans è stato pubblicato nel 2003 e da allora hanno fatto la loro comparsa diversi stili architetturali. Durante questa evoluzione le linee guida del DDD sono rimaste sempre valide al netto di alcuni tecnicismi necessari per l’implementazione delle varie architetture.

Layered architecture

L’architettura originale utilizzata da Evans per isolare la logica di dominio dalle altre responsabilità di un’applicazione è la layered architecture. I layer standard in cui suddividere un’applicazione sono:

  • UI, è lo strato responsabile di visualizzare le informazioni all’utente e interpretare i comandi di quest’ultimo;
  • Applicazione, definisce gli scenari di utilizzo dell’applicazione stessa. Coordina gli oggetti di dominio e si occupa di gestire tutte le attività necessarie al corretto funzionamento dell’applicazione. Ad esempio, sicurezza, transazioni, etc.;
  • Dominio, contiene tutto ciò che riguarda la logica di dominio. Lo stato degli oggetti di dominio è gestito da questo strato anche se la persistenza degli oggetti stessi è delegata all’infrastruttura. Questo è lo strato dove vengono applicate le linee guida del DDD;
  • Infrastruttura, non contiene né logica di dominio né logica applicativa, ma fornisce le implementazioni tecniche che servono agli altri strati per funzionare. Ad esempio, persistenza, gestione delle transazioni, etc.

La regola fondamentale di questa architettura è che ogni layer può dipendere soltanto da quelli sottostanti. L’unico modo per gli strati sottostanti di comunicare con quelli superiori è tramite l’utilizzo di pattern tipo Observer e Mediator.

La separazione netta del modello di dominio in uno strato distinto dagli altri consente di sviluppare e testare le regole di business in totale autonomia rispetto al resto dell’applicazione.

Unico neo di questo approccio è dato dalla dipendenza, seppur mitigata dall’uso di interfacce chiare e ben definite, dello strato di dominio da quello di infrastruttura.

Hexagonal architecture

La Hexagonal architecture di Alistair Cockburn, risolve i problemi dell’architettura precedente e introduce un modo più simmetrico di pensare all’applicazione. Sostanzialmente in questa architettura non c’è più un sopra e un sotto ma soltanto l’interno e l’esterno.

L’interno è costituito da quelli che prima erano lo strato applicativo e quello di dominio, definiti semplicemente “applicazione” da Cockburn. Ovvero tutto quello che implementa i casi d’uso di interesse del business. L’interno comunica con l’esterno tramite “porte” definite dall’applicazione stessa. Esempi di porte possono essere la API pubblica dell’applicazione, l’interfaccia di accesso ai dati e quella di pubblicazione degli eventi di dominio.

L’esterno è costituito da tutto ciò che interagisce con l’applicazione o, vice versa, con cui quest’ultima interagisce. L’interazione avviene sempre tramite adattatori i quali o adattano i segnali esterni alla API dell’applicazione o implementano le interfacce necessarie al corretto funzionamento di quest’ultima. Esempi di adattatori sono i controller che interpretano le richieste HTTP ed invocano le API dell’applicazione o l’implementazione SQL delle interfacce di accesso ai dati.

Quella che per Cockburn è l’applicazione in DDD viene comunemente suddivisa in due livelli concentrici:

  • quello più interno costituito dal modello di dominio che implementa le regole di business;
  • e quello più esterno costituito dagli application service i quali definiscono gli scenari di utilizzo.

Questa architettura consente di sviluppare l’applicazione in totale autonomia rispetto a tutto ciò che le sta intorno (UI, database, etc.), consentendo quindi di eseguirla, e testarla, in varie configurazioni: con UI, da riga di comando, etc.

La Hexagonal architecture è ampiamente adottata e sta alla base di tutte le architetture venute dopo, compresi i pattern descritti di seguito.

CQRS

Negli approcci tradizionali, alcuni descritti sopra, viene utilizzato un unico modello dati, e i relativi servizi, sia per le operazioni di scrittura che per quelle di lettura. Questo porta spesso alla creazione di modelli non ottimali e che espongono troppe informazioni. Inoltre, questo può portare ad un eccessivo accoppiamento fra il modello e il codice client che lo utilizza.

Il Command Query Responsibility Segregation (o per gli amici CQRS) è un pattern architetturale il quale, come dice il nome stesso, prevede di separare totalmente la responsabilità di modificare i dati (Command) da quella di leggerli (Query). La formalizzazione di questo approccio viene generalmente attribuita a Greg Young.

L’utilizzo di due modelli totalmente distinti per lo operazioni di scrittura e per quelle di lettura, previsto dal CQRS, consente invece di progettare e ottimizzare al meglio ogni modello per le operazioni di sua competenza. Oltre a questo, l’utilizzo di modelli distinti consente anche di selezionare le tecnologie più adeguate per le due esigenze. Ad esempio potremmo decidere di utilizzare meccanismi di persistenza diversi perché in lettura potrebbe essere più adeguato utilizzare un DB relazionale mentre in scrittura potremmo preferire un NoSQL.

Una volta separati i modelli di lettura e scrittura, possiamo anche scalare diversamente l’infrastruttura necessaria a supportarli. Spesso succede che il numero di scritture in un sistema è molto inferiore alle letture. Per cui, utilizzando modelli e tecnologie separate, nulla ci vieta di scalare di più l’infrastruttura di lettura rispetto a quella di scrittura.

Ovviamente i due modelli devono essere sincronizzati per garantire che le informazioni lette siano consistenti con quelle scritte. La consistenza non è detto che sia immediata ma deve comunque essere garantito che sarà raggiunta. In DDD la sincronizzazione avviene attraverso i Domain Event generati dagli aggregati.

L’uso dei Domain Event per sincronizzare il modello di lettura con quello di scrittura ha un solo punto debole: generalmente l’emissione di un evento e la persistenza non sono transazionali. Questo è dovuto dal fatto che il sistema di gestione dei messaggi e quello per la persistenza sono sistemi distinti (e.g. RabbitMQ e SQL Server).

Event sourcing

Event sourcing (ES) permette di affrontare il problema appena descritto in modo estremamente elegante. Vediamo come.

Normalmente per persistere un aggregato, la fotografia in un dato momento del suo stato viene salvata su un database. Utilizzando ES invece, ciò che viene salvato non è lo stato attuale dell’aggregato ma la sequenza dei Domain Event che hanno portato l’aggregato nel suo stato attuale. Per ricaricare un aggregato è sufficiente leggere tutti gli eventi ad esso associati e ricalcolare il suo stato a partire dagli eventi stessi.

In questo modo ogni volta che viene modificato un aggregato non si aggiorna un record esistente ma se ne appende uno nuovo al flusso degli eventi che lo rappresentano. Questo approccio è molto efficiente perché si annulla il rischio di lock concorrenti sui record di una tabella e, di conseguenza, le possibilità di deadlock fra thread di scrittura diversi.

Questo tipo di persistenza prende il nome di event store. È possibile crearsi il proprio event store come descritto nel libro di Vaughn Vernon, utilizzando un database relazionale, oppure, come vedremo in seguito, si può utilizzare un prodotto esistente che fornisce questa funzionalità specifica.

Tornando al problema della non transanzionalità accennato in precedenza, utilizzando un event store si ha un unico posto in cui vengono salvati i Domain Event e, implicitamente, lo stato degli aggregati. È quindi sufficiente leggere gli eventi da pubblicare dall’event store per evitare rischi di inconsistenza tra il modello di scrittura e quello di lettura.

L’applicazione

Passiamo alla pratica. Per approfondire i pattern architetturali brevemente descritti sopra e sperimentarli praticamente, ho deciso di sviluppare un’applicazione di prova con l’obiettivo di implementare un aggregato secondo il paradigma Event Sourcing e di creare un modello di lettura separato che potesse essere utilizzato per alimentare le pagine dell’applicazione Web.

Inoltre, ho voluto utilizzare Event Store per la parte di persistenza degli eventi. Come dice il nome stesso Event Store è un event store sviluppato, tra gli altri, da Greg Young.

L’applicazione consente di creare dei carrelli e di aggiungerci dei prodotti specificando la loro quantità. I casi d’uso sono molto semplici:

  • deve essere possibile creare un carrello fornendo il cliente a cui appartiene;
  • deve essere possibile aggiungere un prodotto al carrello specificando la quantità a patto che il prodotto non sia già presente nel carrello;
  • deve essere possibile modificare la quantità di un prodotto già presente in un carrello;
  • infine per ogni prodotto non deve essere possibile aggiungere più di 50 unità.

L’applicazione è sviluppata con ASP.NET Core, C# e Docker.

Modellare un aggregato secondo Event Sourcing

Come prima cosa, per affrontare il problema descritto sopra, sono partito dalla modellazione dell’aggregato carrello (Cart). Seguendo un approccio classico, ho iniziato a pensare alle proprietà delle classi che avrei dovuto persistere sul database utilizzando Entity Framework ma, dopo un primo momento di indecisione, ho cambiato strategia.

Ho quindi iniziato a pensare agli eventi di dominio che l’aggregato avrebbe dovuto generare nei vari casi d’uso descritti sopra e sono partito dalla creazione di un nuovo carello.

Estratto di CartTest.cs
1
2
3
4
5
6
7
8
9
10
11
[Fact]
public void GivenNoCartExistsWhenCreateOneThenCartCreatedEvent()
{
    var cart = new Cart(DefaultCartId, DefaultCustomerId);

    AssertSingleUncommittedEvent<CartCreatedEvent>(cart, @event =>
    {
      Assert.Equal(DefaultCartId, @event.AggregateId);
      Assert.Equal(DefaultCustomerId, @event.CustomerId);
    });
}

Come si può capire dal test, quando creo una nuova istanza di carrello, passandogli il proprio identificativo e quello del cliente a cui è associato, mi aspetto che sia stato emesso un solo evento di tipo CartCreatedEvent e che l’evento contenga le informazioni corrette. Il metodo AssertSingleUncommittedEvent è un metodo di utilità della classe di test per verificare gli eventi generati dall’aggregato e non ancora persistiti.

Seguendo il paradigma Event Sourcing ogni azione eseguita su un aggregato, anche la sua creazione, si divide in tre passi concettuali:

  1. la verifica dei parametri di ingresso e dello stato dell’aggregato al fine di verificare la fattibilità dell’azione stessa;
  2. in caso positivo dei controlli precedenti, l’emissione degli eventi scatenati dall’azione;
  3. l’aggiornamento dello stato dell’aggregato in funzione degli eventi di cui sopra.

La separazione dei passi 2 e 3, come vedremo in seguito, è necessaria quando dobbiamo ricreare un oggetto a partire dagli eventi presenti sull’event store. Con queste linee guida, per soddisfare il test sopra, ho creato la classe seguente.

Estratto di Cart.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Cart : AggregateBase<CartId>
{
    private CustomerId CustomerId { get; set; }

    public Cart(CartId cartId, CustomerId customerId) : this()
    {
        if (cartId == null) throw new ArgumentNullException(nameof(cartId));
        if (customerId == null) throw new ArgumentNullException(nameof(customerId));
        RaiseEvent(new CartCreatedEvent(cartId, customerId));
    }

    internal void Apply(CartCreatedEvent ev)
    {
        Id = ev.AggregateId;
        CustomerId = ev.CustomerId;
    }
}

Il codice sopra rispecchia esattamente i tre passi descritti in precedenza: controllo (righe 7 e 8), emissione eventi (riga 9) e aggiornamento dello stato (metodo Apply).

Al fine di rendere più semplice la lettura della classe Cart e evitare ripetizioni del codice ho creato la classe base AggregateBase che sostanzialmente agisce da Layer Supertype per gli aggregati del dominio. La classe base è descritta più in dettaglio successivamente.

In modo del tutto analogo a quanto fatto per la creazione di un nuovo carrello, per implementare il caso d’uso dell’aggiunta di un prodotto al carrello sono partito dalla definizione del test.

Estratto di CartTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Fact]
public void GivenACartWhenAddAProductThenProductAddedEvent()
{
    var cart = new Cart(DefaultCartId, DefaultCustomerId);
    ClearUncommittedEvents(cart);

    cart.AddProduct(DefaultProductId, 2);

    AssertSingleUncommittedEvent<ProductAddedEvent>(cart, @event =>
    {
        Assert.Equal(DefaultProductId, @event.ProductId);
        Assert.Equal(2, @event.Quantity);
        Assert.Equal(DefaultCartId, @event.AggregateId);
        Assert.Equal(0, @event.AggregateVersion);
    });
}
L’unica differenza rispetto al test precedente è l’utilizzo del metodo di utilità ClearUncommittedEvents per svuotare la lista degli eventi ancora da salvare. A parte questo dettaglio, anche in questo caso la lettura del test è molto semplice: quando aggiungo un prodotto al carrello mi aspetto che venga emesso l’evento corrispondente contenente i dati necessari a descrivere l’operazione appena avvenuta.

È fondamentale notare che non ho fatto alcuna asserzione sullo stato (proprietà o campi) della classe Cart. Infatti in questo caso non mi interessa sapere come la classe gestisce internamente il suo stato, ma semplicemente voglio accertarmi che vengano emessi gli eventi corretti. Sono quest’ultimi infatti che mi consentiranno in seguito di costruire un modello di lettura per la mia applicazione web.

Questa è la perfetta applicazione del principio del Cavaliere Nero, noto ai più come “incapsulamento”. Non potrò mai ringraziare abbastanza Ziobrando per aver trovato una metafora così esplicativa per un concetto così semplice e spesso sottovalutato.

Lo stato dell’aggregato è fondamentale per il corretto funzionamento di quest’ultimo. Per accertarmi che l’aggregato gestisse correttamente lo stato è stato sufficiente aggiungere il seguente test.

Estratto di CartTest.cs
1
2
3
4
5
6
7
8
9
10
11
[Fact]
public void GivenACartWithAProductWhenAddingTheSameProductThenThrowsCartException()
{
    var cart = new Cart(DefaultCartId, DefaultCustomerId);

    cart.AddProduct(DefaultProductId, 2);
    ClearUncommittedEvents(cart);

    Assert.Throws<CartException>(() => { cart.AddProduct(DefaultProductId, 1); });
    Assert.Empty(GetUncommittedEventsOf(cart));
}
Se Cart non gestisse correttamente il proprio stato non avrebbe modo di controllare se il prodotto è già presente nel carrello o meno.

Dati i test sopra, l’implementazione del comportamento atteso è stata la seguente:

Estratto di Cart.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Cart : AggregateBase<CartId>
{
  private List<CartItem> Items { get; set; }

  public void AddProduct(ProductId productId, int quantity)
  {
    if (productId == null)
    {
      throw new ArgumentNullException(nameof(productId));
    }
    if (ContainsProduct(productId))
    {
      throw new CartException($"Product {productId} already added");
    }
    RaiseEvent(new ProductAddedEvent(productId, quantity));
  }

  internal void Apply(ProductAddedEvent ev)
  {
    Items.Add(new CartItem(ev.ProductId, ev.Quantity));
  }

  private bool ContainsProduct(ProductId productId)
  {
    return Items.Any(x => x.ProductId == productId);
  }
}
Estratto di CartItem.cs
1
2
3
4
5
6
7
8
9
10
11
12
public class CartItem
{
  public CartItem(ProductId productId, int quantity)
  {
    ProductId = productId;
    Quantity = quantity;
  }

  public ProductId ProductId { get; }

  public int Quantity { get; }
}
Come si vede il carrello mantiene una lista di oggetti aggiunti nel suo stato, ma la lista non è accessibile in alcun modo all’esterno perché è strettamente funzionale all’implementazione dei comportamenti attesi e non deve essere utilizzata in alcun modo dal codice client per consultazione. Vorrei far notare che per l’implementazione dei casi d’uso presi in considerazione fino ad ora, non sarebbe stato necessario mantenere le quantità nella lista. Sarebbe stato sufficiente mantenere una lista di identificativi di prodotto. La quantità è presente solo perché funzionale agli altri casi d’uso descritti sopra.

Dato che l’articolo è già piuttosto denso di concetti eviterò di descrivere l’implementazione degli altri casi d’uso e lascio a chi è interessato la consultazione del codice su GitHub.

AggregateBase

AggregateBase implementa due interfacce basilari. La prima è IAggregate la quale stabilisce che ogni aggregato debba avere un identificativo.

IAggregate.cs
1
2
3
4
public interface IAggregate<TId>
{
    TId Id { get; }
}

La seconda interfaccia invece definisce le firme dei metodi necessari per funzionare correttamente con il paradigma Event Sourcing. Ogni aggregato deve avere una versione (Version) per poter gestire eventuali conflitti di scrittura. Deve essere inoltre possibile applicare un evento di dominio (ApplyEvent), utile per caricare un aggregato dall’event store. Deve essere infine possibile ottenere l’elenco degli eventi non ancora salvati sull’event store (GetUncommittedEvents) e svuotarlo (ClearUncommittedEvents).

IEventSourcingAggregate.cs
1
2
3
4
5
6
7
internal interface IEventSourcingAggregate<TAggregateId>
{
    long Version { get; }
    void ApplyEvent(IDomainEvent<TAggregateId> @event, long version);
    IEnumerable<IDomainEvent<TAggregateId>> GetUncommittedEvents();
    void ClearUncommittedEvents();
}
L’interfaccia è dichiarata internal perché sia visibile solo internamente all’assembly che contiene gli oggetti di dominio. Questo è un dettaglio implementativo che non è di interesse per chi utilizza gli aggregati.

La classe AggregateBase implementa le interfacce sopra in modo abbastanza lineare. Le implementazioni che meritano un approfondimento sono due: ApplyEvent e RaiseEvent.

ApplyEvent - per evitare duplicazioni, il metodo ApplyEvent, verifica che l’evento da applicare non sia fra quelli ancora da salvare. Nel caso in cui l’evento sia effettivamente da applicare, il costrutto a riga 18 consente di invocare il metodo specifico per applicare l’evento di dominio.

Nel caso della classe Cart e dell’evento CartCreatedEvent, l’uso della parola chiave dynamic consente alla classe AggregateBase di invocare dinamicamente a runtime il metodo internal void Apply(CartCreatedEvent ev) della classe Cart.

Questo benchmark mostra come l’invocazione dinamica dei metodi, pur essendo più lenta rispetto all’uso di switch e pattern matching, sia comunque molto più veloce rispetto ad altre possibili implementazioni.

RaiseEvent - questo metodo garantisce di assegnare la versione e l’identificativo dell’aggregato corretti all’evento che viene emesso, semplificando così il codice della classe Cart. Inoltre, prima di accodare l’evento emesso a quelli da salvare, applica l’evento stesso all’aggregato in modo che lo stato di quest’ultimo sia consistente con gli eventi emessi.

AggregateBase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class AggregateBase<TId> : 
  IAggregate<TId>, IEventSourcingAggregate<TId>
{
  public const long NewAggregateVersion = -1;

  private readonly ICollection<IDomainEvent<TId>> _uncommittedEvents = 
    new LinkedList<IDomainEvent<TId>>();
  private long _version = NewAggregateVersion;

  public TId Id { get; protected set;  }

  long IEventSourcingAggregate<TId>.Version => _version;

  void IEventSourcingAggregate<TId>.ApplyEvent(IDomainEvent<TId> @event, long version)
  {
    if (!_uncommittedEvents.Any(x => Equals(x.EventId, @event.EventId)))
    {
      ((dynamic)this).Apply((dynamic)@event);
      _version = version;
    }
  }

  void IEventSourcingAggregate<TId>.ClearUncommittedEvents()
    => _uncommittedEvents.Clear();

  IEnumerable<IDomainEvent<TId>> IEventSourcingAggregate<TId>.GetUncommittedEvents()
    => _uncommittedEvents.AsEnumerable();

  protected void RaiseEvent<TEvent>(TEvent @event)
      where TEvent: DomainEventBase<TId>
  {
    IDomainEvent<TId> eventWithAggregate = @event.WithAggregate(
      Equals(Id, default(TId)) ? @event.AggregateId : Id,
      _version);

    ((IEventSourcingAggregate<TId>)this).ApplyEvent(eventWithAggregate, _version + 1);
    _uncommittedEvents.Add(@event);
  }
}

Persistenza

Seguendo un approccio classico al DDD, per poter accedere agli aggregati e per persisterli definirei un’interfaccia IRepository nel domain model con le classiche operazioni GetByID, Remove, Save ed eventuali metodi di ricerca più o meno specializzati. Dopodiché implementerei tale interfaccia in qualche assembly di supporto. Tale implementazione sarebbe dipendente a qualche meccanismo di persistenza e tale dipendenza sarebbe sicuramente esplicitata dal nome dell’implementazione stessa, e.g. EntityFrameworkRepository.

Utilizzando Event Sourcing la definizione dell’interfaccia, e la relativa implementazione, assumono una forma diversa. Prima di tutto l’interfaccia IRepository è più semplice.

IRepository.cs
1
2
3
4
5
6
7
public interface IRepository<TAggregate, TAggregateId>
  where TAggregate: IAggregate<TAggregateId>
{
  Task<TAggregate> GetByIdAsync(TAggregateId id);

  Task SaveAsync(TAggregate aggregate);
}
Fare delle ricerche per attributo su un event store è complesso e decisamente poco performante. Inoltre gli event store sono meccanismi di persistenza in sola aggiunta per cui non è possibile cancellare gli eventi scritti in precedenza. Per questi motivi le uniche operazioni sensate diventano il recupero di un aggregato tramite identificativo e il salvataggio di un aggregato, sia esso nuovo o preesistente.

Per quanto riguarda l’implementazione invece, il repository deve essere a conoscenza almeno dei metodi dell’interfaccia IEventSourcingAggregate per poter accedere agli eventi di dominio emessi dall’aggregato e per poter passare gli eventi recuperati dall’event store. Per questo motivo è più naturale mettere parte dell’implementazione del repository nello stesso assembly che contiene il domain model.

EventSourcingRepository.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class EventSourcingRepository<TAggregate, TAggregateId> :
    IRepository<TAggregate, TAggregateId>
  where TAggregate : AggregateBase<TAggregateId>, IAggregate<TAggregateId>
  where TAggregateId : IAggregateId
{
  private readonly IEventStore eventStore;
  private readonly ITransientDomainEventPublisher publisher;

  public EventSourcingRepository(IEventStore eventStore,
    ITransientDomainEventPublisher publisher)
  {
    this.eventStore = eventStore;
    this.publisher = publisher;
  }

  public async Task<TAggregate> GetByIdAsync(TAggregateId id)
  {
    try
    {
      var aggregate = CreateEmptyAggregate();
      IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;

      foreach (var @event in await eventStore.ReadEventsAsync(id))
      {
        aggregatePersistence.ApplyEvent(@event.DomainEvent, @event.EventNumber);
      }
      return aggregate;
    }
    catch (EventStoreAggregateNotFoundException)
    {
      return null;
    }
    catch (EventStoreCommunicationException ex)
    {
      throw new RepositoryException("Unable to access persistence layer", ex);
    }
  }

  public async Task SaveAsync(TAggregate aggregate)
  {
    try
    {
      IEventSourcingAggregate<TAggregateId> aggregatePersistence = aggregate;

      foreach (var @event in aggregatePersistence.GetUncommittedEvents())
      {
        await eventStore.AppendEventAsync(@event);
        await publisher.PublishAsync((dynamic)@event);
      }
      aggregatePersistence.ClearUncommittedEvents();
    }
    catch (EventStoreCommunicationException ex)
    {
      throw new RepositoryException("Unable to access persistence layer", ex);
    }
  }

  private TAggregate CreateEmptyAggregate()
  {
    return (TAggregate)typeof(TAggregate)
      .GetConstructor(
        BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public, 
        null, new Type[0], new ParameterModifier[0])
      .Invoke(new object[0]);
  }
}

La parte dell’implementazione del repository che resta fuori dal domain model è quella relativa allo specifico event store utilizzato. A questo scopo ho introdotto l’interfaccia IEventStore. L’implementazione di tale interfaccia è spiegata più avanti.

IEventStore.cs
1
2
3
4
5
6
7
8
public interface IEventStore
{
  Task<IEnumerable<Event<TAggregateId>>> ReadEventsAsync<TAggregateId>(TAggregateId id)
    where TAggregateId: IAggregateId;

  Task<AppendResult> AppendEventAsync<TAggregateId>(IDomainEvent<TAggregateId> @event)
    where TAggregateId: IAggregateId;
}

Tornando alla classe EventSourcingRepository, si vede come il metodo GetByIdAsync applichi tutti gli eventi recuperati dall’event store ad un’istanza vuota dell’aggregato tramite il metodo ApplyEvent. L’istanza vuota dell’aggregato viene creata utilizzando il metodo CreateEmptyAggregate. Tale metodo fa uso della reflection di C# per creare un aggregato utilizzando il costruttore di default. Ho scelto di utilizzare la reflection perché voglio che gli aggregati possano dichiarare il costruttore di default privato, come nel caso della classe Cart. Questo perché il fatto di esporre o meno il costruttore di default è una scelta progettuale dell’aggregato, e non deve essere un’imposizione dovuta alle scelte architetturali.

Per quanto riguarda il metodo SaveAsync, si occupa di recuperare gli eventi non ancora salvati dall’aggregato, di salvarli sull’event store e, se tutto va bene, di rimuoverli dall’aggregato.

Ho scelto di utilizzare questo metodo anche per pubblicare gli eventi salvati internamente all’applicazione tramite l’interfaccia ITransientDomainEventPublisher. La classe che implementa questa interfaccia non è altro che un lightweight publisher come descritto nel libro rosso di Vaugh Vernon al capitolo 8 “Domain Events”. A differenza di quanto spiegato nel libro, nella mia applicazione di esempio gli eventi vengono pubblicati dal repository invece che dagli aggregati. Questo per due motivi: il codice degli aggregati risulta più semplice e il metodo SaveAsync è l’unico in cui siamo sicuri che gli eventi sono stati veramente salvati e che non ci sono stati conflitti di alcun genere.

Event Store

Per questo esperimento ho deciso di implementare la persistenza del modello Event Sourcing utilizzando Event Store. Per utilizzarlo ho incluso l’immagine Docker eventstore/eventstore nel file docker-compose.yml del mio progetto. Ho utilizzato la configurazione di base perché per il mio esperimento ho avuto esigenze particolari.

Per dialogare con il server Event Store ho installato nella mia soluzione il pacchetto NuGet EventStore.ClientAPI.NetCore, versione 4.0.3-rc. Non avendo mai utilizzato questo prodotto ho scritto alcuni test per verificare il comportamento della libreria e del server. Dopodiché ho implementato l’interfaccia IEventStore descritta sopra.

Event Store offre molte funzionalità interessanti come le proiezioni. Per poterle utilizzare è necessario memorizzare gli eventi in formato JSON. Dato che altri event store potrebbero fare affidamento a formati diversi, ho deciso di implementare la serializzazione e deserializzazione degli eventi a questo livello.

Per essere più conciso non riporto qui l’implementazione che comunque è disponibile sul repository GitHub.

CQRS e modello di lettura

Come già detto più volte in precedenza, utilizzando Event Sourcing per implementare la logica di business non abbiamo più un modello di lettura da utilizzare per visualizzare i dati all’utente. Per risolvere questo problema viene in nostro soccorso CQRS.

Seguendo le linee guida di questo pattern ho creato un semplice modello si lettura che si adattasse all’interfaccia web che volevo realizzare.

EventSourcingCQRS.ReadModel\Cart\Cart.cs
1
2
3
4
5
6
7
public class Cart : IReadEntity
{
  public string Id { get; set; }
  public int TotalItems { get; set; }
  public string CustomerId { get; set; }
  public string CustomerName { get; set; }
}
EventSourcingCQRS.ReadModel\Cart\CartItem.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CartItem : IReadEntity
{
    public string Id { get; private set; }
    public string CartId { get; set; }
    public string ProductId { get; set; }
    public string ProductName { get; set; }
    public int Quantity { get; set; }

    public static CartItem CreateFor(string cartId, string productId)
    {
      return new CartItem
      {
        Id = IdFor(cartId, productId),
        CartId = cartId,
        ProductId = productId
      };
    }

    public static string IdFor(string cartId, string productId)
    {
      return $"{productId}@{cartId}";
    }
}

Questo modello deve essere tenuto sincronizzato con il modello di scrittura, e per questo motivo ho creato la classe CartUpdater la quale, gestendo gli eventi generati dal modello di scrittura, si occupasse di aggiornare il modello di lettura.

CartUpdater.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class CartUpdater : IDomainEventHandler<CartId, CartCreatedEvent>,
  IDomainEventHandler<CartId, ProductAddedEvent>,
  IDomainEventHandler<CartId, ProductQuantityChangedEvent>
{
  private readonly IReadOnlyRepository<Customer.Customer> customerRepository;
  private readonly IReadOnlyRepository<Product.Product> productRepository;
  private readonly IRepository<Cart.Cart> cartRepository;
  private readonly IRepository<Cart.CartItem> cartItemRepository;

  public CartUpdater(IReadOnlyRepository<Customer.Customer> customerRepository,
    IReadOnlyRepository<Product.Product> productRepository,
    IRepository<Cart.Cart> cartRepository,
    IRepository<Cart.CartItem> cartItemRepository)
  {
    this.customerRepository = customerRepository;
    this.productRepository = productRepository;
    this.cartRepository = cartRepository;
    this.cartItemRepository = cartItemRepository;
  }

  public async Task HandleAsync(CartCreatedEvent @event)
  {
    var customer = await customerRepository.GetByIdAsync(
      @event.CustomerId.IdAsString());

    await cartRepository.InsertAsync(new Cart.Cart
      {
        Id = @event.AggregateId.IdAsString(),
        CustomerId = customer.Id,
        CustomerName = customer.Name,
        TotalItems = 0
      });
  }

  public async Task HandleAsync(ProductAddedEvent @event)
  {
    var product = await productRepository.GetByIdAsync(@event.ProductId.IdAsString());
    var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());
    var cartItem = Cart.CartItem.CreateFor(
      @event.AggregateId.IdAsString(),
      @event.ProductId.IdAsString());

    cartItem.ProductName = product.Name;
    cartItem.Quantity = @event.Quantity;
    cart.TotalItems += @event.Quantity;
    await cartRepository.UpdateAsync(cart);
    await cartItemRepository.InsertAsync(cartItem);
  }

  public async Task HandleAsync(ProductQuantityChangedEvent @event)
  {
    var cartItemId = Cart.CartItem.IdFor(
      @event.AggregateId.IdAsString(),
      @event.ProductId.IdAsString());
    var cartItem = (await cartItemRepository
      .FindAllAsync(x => x.Id == cartItemId))
      .Single();
    var cart = await cartRepository.GetByIdAsync(@event.AggregateId.IdAsString());

    cart.TotalItems += @event.NewQuantity - @event.OldQuantity;
    cartItem.Quantity = @event.NewQuantity;

    await cartRepository.UpdateAsync(cart);
    await cartItemRepository.UpdateAsync(cartItem);
  }
}
Le interfacce IRepository e IReadOnlyRepositoy non hanno niente a che vedere con l’interfaccia IRepository descritta nel paragrafo Repository. Queste interfacce infatti appartengono al progetto EventSourcingCQRS.ReadModel e sono quelle utilizzate per accedere al modello di lettura. L’unica interazione fra la classe CartUpdater e il modello di scrittura avviene esclusivamente tramite gli eventi di dominio generati da quest’ultimo.

Per inciso, l’implmentazione di IRepository e IReadOnlyRepositoy utilizzata in questa applicazione di esempio utilizza MongoDB e niente ha a che vedere con Event Store.

Un problema che spesso si ha quando si vuole utilizzare CQRS è il possibile ritardo che si può avere nell’aggiornamento del modello di lettura a seguito di azioni su quello di scrittura. Data la natura esemplificativa di questa applicazione ho deciso di risolvere questo problema molto semplicemente: ho registrato CartUpdater come consumatore del lightweight publisher descritto sopra. Così facendo ho la garanzia che l’aggiornamento del modello di lettura sia completato prima che il metodo SaveAsync (vedi paragrafo Repository) sia concluso.

Questo non è certamente il modo corretto per gestire questo problema dato che riduce la scalabilità del sistema legando le operazioni sul modello di dominio all’aggiornamento del modello di lettura. Ciononostante, se si sta lavorando su un applicazione esistente e si vuole introdurre gradualmente questi pattern, questa è sicuramente una strada percorribile.

Tornando al codice, chi si occupa di coordinare le azioni sul modello di scrittura con l’aggiornamento del modello di lettura è il servizio applicativo CartWriter riportato qui sotto.

CartWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class CartWriter : ICartWriter
{
  private readonly IRepository<Cart, CartId> cartRepository;
  private readonly ITransientDomainEventSubscriber subscriber;
  private readonly IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers;
  private readonly IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers;
  private readonly IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers;

  public CartWriter(IRepository<Cart, CartId> cartRepository, ITransientDomainEventSubscriber subscriber,
      IEnumerable<IDomainEventHandler<CartId, CartCreatedEvent>> cartCreatedEventHandlers,
      IEnumerable<IDomainEventHandler<CartId, ProductAddedEvent>> productAddedEventHandlers,
      IEnumerable<IDomainEventHandler<CartId, ProductQuantityChangedEvent>> productQuantityChangedEventHandlers)
  {
    this.cartRepository = cartRepository;
    this.subscriber = subscriber;
    this.cartCreatedEventHandlers = cartCreatedEventHandlers;
    this.productAddedEventHandlers = productAddedEventHandlers;
    this.productQuantityChangedEventHandlers = productQuantityChangedEventHandlers;
  }

  public async Task AddProductAsync(string cartId, string productId, int quantity)
  {
    var cart = await cartRepository.GetByIdAsync(new CartId(cartId));

    subscriber.Subscribe<ProductAddedEvent>(
      async @event => await HandleAsync(productAddedEventHandlers, @event));
    cart.AddProduct(new ProductId(productId), quantity);
    await cartRepository.SaveAsync(cart);
  }

  public async Task ChangeProductQuantityAsync(
      string cartId, string productId, int quantity)
  {
    var cart = await cartRepository.GetByIdAsync(new CartId(cartId));

    subscriber.Subscribe<ProductQuantityChangedEvent>(
      async @event => await HandleAsync(productQuantityChangedEventHandlers, @event));
    cart.ChangeProductQuantity(new ProductId(productId), quantity);
    await cartRepository.SaveAsync(cart);
  }

  public async Task CreateAsync(string customerId)
  {
    var cart = new Cart(CartId.NewCartId(), new CustomerId(customerId));

    subscriber.Subscribe<CartCreatedEvent>(
      async @event => await HandleAsync(cartCreatedEventHandlers, @event));
    await cartRepository.SaveAsync(cart);
  }

  public async Task HandleAsync<T>(
      IEnumerable<IDomainEventHandler<CartId, T>> handlers, T @event)
    where T : IDomainEvent<CartId>
  {
    foreach (var handler in handlers)
    {
      await handler.HandleAsync(@event);
    }
  }
}
Come si può vedere dal codice, il servizio CartWriter non conosce direttamente la classe CartUpdater. Quest’ultima viene iniettata come implementazione dell’interfaccia IDomainEventHandler. CartWriter si occupa di registrare tutti gli handler per la gestione degli eventi di dominio.

L’interfaccia ITransientDomainEventSubscriber è la controparte dell’interfaccia ITransientDomainEventPublisher vista in precedenza. Entrambe le interfacce in realtà sono implementate dalla classe TransientDomainEventPubSub la quale si occupa di recapitare a tutti i consumatori registrati gli eventi di dominio emessi. Questa classe si chiama transient perché garantisce che le sottoscrizioni siano valide solamente all’interno di un singolo contesto di esecuzione. Questo fa sì che gli handler registrati durante un’operazione non vengano invocati dalle successive operazioni.

Il servizio CartWriter visto in precedenza non è altro che la facade del nostro modello di scrittura. Per simmetria esiste una facade per il modello di lettura la quale, senza troppa fantasia, si chiama CartReader.

CartReader.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CartReader : ICartReader
{
  private readonly IReadOnlyRepository<Cart> cartRepository;
  private readonly IReadOnlyRepository<CartItem> cartItemRepository;

  public CartReader(IReadOnlyRepository<Cart> cartRepository,
      IReadOnlyRepository<CartItem> cartItemRepository)
  {
    this.cartRepository = cartRepository;
    this.cartItemRepository = cartItemRepository;
  }

  public async Task<IEnumerable<Cart>> FindAllAsync(
      Expression<Func<Cart, bool>> predicate)
  {
    return await cartRepository.FindAllAsync(predicate);
  }

  public async Task<Cart> GetByIdAsync(string id)
  {
    return await cartRepository.GetByIdAsync(id);
  }

  public async Task<IEnumerable<CartItem>> GetItemsOfAsync(string cartId)
  {
    return await cartItemRepository.FindAllAsync(x => x.CartId == cartId);
  }
}
Questa classe utilizza i repository del modello di lettura per ritornare le informazioni al client.

Riassumendo, per implementare il pattern CQRS, e poter visualizzare i dati all’utente, ho creato un modello di lettura (Cart e CartItem) che tengo sincronizzato con il modello di dominio tramite CartUpdater e ho fornito due servizi applicativi (CartWriter e CartReader) i quali separano nettamente le operazioni di scrittura da quelle di lettura.

La potenza di questo approccio sta nel fatto che posso creare quanti modelli di lettura voglio, in funzione delle caratteristiche che devo sviluppare. Oltre a questo, nel caso in cui un modello di lettura esistente non sia più adeguato, e.g. perché sono cambiati i requisiti, posso eliminarlo e crearne uno nuovo. Il nuovo modello verrebbe popolato andando a rieseguire gli eventi di dominio passati presenti nell’event store.

Conclusioni

Il post è sicuramente il più lungo che ho scritto fino ad oggi ma l’argomento meritava tutto lo spazio e il tempo che gli ho dedicato e anche molto di più. Il codice che ho prodotto durante l’esperimento è più di quello che sono riuscito a presentare ed è disponibile su GitHub.

Event Sourcing è un paradigma diverso da quello con cui uno sviluppatore è normalmente abituato a pensare e richiede un certo cambio di mentalità. Una volta fatto proprio risulta molto semplice. Ci sono aspetti che avrebbero meritato ulteriori approfondimenti ma che non hanno trovato spazio in questo post: gestione degli snapshot, pubblicazione degli eventi ad altre applicazioni e correzione degli errori solo per nominare quelle più importanti.

CQRS diventa indispensabile se si vuole usare Event Sourcing ma può essere utilizzato anche da solo. Per utilizzarlo al pieno delle sue possibilità è necessario gestire correttamente i possibili ritardi nell’aggiornamento del modello di lettura. Per questo motivo la sua adozione deve essere valutata caso per caso, ma è molto flessibile e le potenzialità che ho sperimentato sono sicuramente interessanti.