.NET Domain Events Using MediatR

Category

DDD

Published on
Authors

Background

Domain Events are a concept in Domain Driven Design (DDD) that allow us to decouple our entities. More technically, this is usually done in a pub/sub fashion. This means that one entity can raise an event that says ‘Hey, I did this this!' (e.g. AuthorCreated, OrderSubmitted), and other entities can have features that subscribe to those events and listen for them so they can then react however our business rules deem necessary.

You can have domain events that publish external messages to a message broker like RabbitMQ as we've seen in some of my previous blog posts, or you can have internal messages to communicate to other entities within a particular boundary. Generally, these internal events are uses to synchronize state between entities or do some infrastructure related task like sending an email notification. Depending on the context, latter could be done with internal or external notifications.

As I've been getting more into DDD, I've had more and more instances where I've wanted to add internal Domain Events into my workflow, but didn't know exactly how I wanted to do it. As I was brainstorming options and seeing how others did it, I came across this post by Camron Frenzel and got inspired. The approach below is very similar to Camron's post, but uses a slightly different implementation that I figured I'd share.

Example Repo

Here is an example repo for reference if you want to take a look.

Capturing Domain Events

Let's start with a basic interface called IDomainEvent that inherits from MediatR's INotification. This just gives us our own explicit type of message that we can publish for MediatR to work with.

Then we have a BaseEntity that all our entities can inherit from. This entity has two properties:

  1. A list of IDomainEvents called DomainEvents that we will use to capture messages we want to publish.
  2. A primary key property called Id. This is not really of relevance for this functionality, but a common item for reference.

There's also a QueueDomainEvent method that we expose to add events to our list.

namespace RecipeManagement.Domain;

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using MediatR;

public interface IDomainEvent : INotification { }

public abstract class BaseEntity
{
	[Key]
    public Guid Id { get; private set; } = Guid.NewGuid();

    [NotMapped]
    public List<IDomainEvent> DomainEvents { get; } = new List<IDomainEvent>();

    public void QueueDomainEvent(IDomainEvent @event)
    {
        DomainEvents.Add(@event);
    }
}

Building a Message

Next, let's make a message we can publish. For this example, I have an Author entity that I want to publish a message for each time it is created or updated. Something like this:

namespace RecipeManagement.Domain.Authors.DomainEvents;

public class AuthorAdded : IDomainEvent
{
    public Author Author { get; set; }
}

💡 If I were publishing an out of process message to something like RabbitMQ, I would usually keep it as small as possible and only send something like the entity id. This is primarily because you can only extend the message due to the unknown nature of who is using what items in the message. In this case, since we are using an in-memory process that is self contained in this particular boundary, this is less of a concern.

Capturing Domain Events

Let's say we have an Author entity that looks like this:

namespace RecipeManagement.Domain.Authors;

using SharedKernel.Dtos.RecipeManagement.Author;
using Mappings;
using Validators;
using AutoMapper;
using FluentValidation;
using System.Text.Json.Serialization;
using System.ComponentModel.DataAnnotations.Schema;
using System.Runtime.Serialization;
using DomainEvents;
using Sieve.Attributes;
using Recipes;

public class Author : BaseEntity
{
    [Sieve(CanFilter = true, CanSort = true)]
    public string Name { get; private set; }

    [JsonIgnore]
    [IgnoreDataMember]
    [ForeignKey("Recipe")]
    public Guid RecipeId { get; private set; }
    public Recipe Recipe { get; private set; }

    public static Author Create(AuthorForCreationDto authorForCreationDto)
    {
        new AuthorForCreationDtoValidator().ValidateAndThrow(authorForCreationDto);
        var mapper = new Mapper(new MapperConfiguration(cfg => {
            cfg.AddProfile<AuthorProfile>();
        }));
        var newAuthor = mapper.Map<Author>(authorForCreationDto);
        newAuthor.QueueDomainEvent(new AuthorAdded(){Author = newAuthor});

        return newAuthor;
    }

    public void Update(AuthorForUpdateDto authorForUpdateDto)
    {
        new AuthorForUpdateDtoValidator().ValidateAndThrow(authorForUpdateDto);
        var mapper = new Mapper(new MapperConfiguration(cfg => {
            cfg.AddProfile<AuthorProfile>();
        }));
        mapper.Map(authorForUpdateDto, this);
    }

    private Author() { } // For EF
}

The Create and Update factory methods could done through a variety of implementations. The item of note here is newAuthor.QueueDomainEvent(new AuthorAdded(){Author = newAuthor});. This adds the message to the DomainEvents property that our entity uses to store messages it wants to publish.

Publishing Your Events

At this point, we have a list of events, but no way to publish them! To do this, we're going to add a couple methods and overrides to our DbContext.

Let's start here:

namespace RecipeManagement.Databases;

using Domain.Authors;
using Domain.Ingredients;
using Domain;
using Services;
using Microsoft.EntityFrameworkCore;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using Microsoft.EntityFrameworkCore.Query;

public class RecipesDbContext : DbContext
{
    private readonly IMediator _mediator;

    public RecipesDbContext(
        DbContextOptions<RecipesDbContext> options, IMediator mediator) : base(options)
    {
        _mediator = mediator;
    }

    public DbSet<Author> Authors { get; set; }

    public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new())
    {
        return await base.SaveChangesAsync(cancellationToken);
    }

    public override int SaveChanges()
		{
        return base.SaveChanges();
		}

    private async Task _dispatchDomainEvents()
    {
        var domainEventEntities = ChangeTracker.Entries<BaseEntity>()
            .Select(po => po.Entity)
            .Where(po => po.DomainEvents.Any())
            .ToArray();

        foreach (var entity in domainEventEntities)
        {
            foreach (var entityDomainEvent in entity.DomainEvents)
                await _mediator.Publish(entityDomainEvent);
        }
    }
}

From here, let's make a method that can loop through all our entities and publish all the messages that we've capture from our QueueDomainEvent operations.

  private async Task _dispatchDomainEvents()
  {
      var domainEventEntities = ChangeTracker.Entries<BaseEntity>()
          .Select(po => po.Entity)
          .Where(po => po.DomainEvents.Any())
          .ToArray();

      foreach (var entity in domainEventEntities)
      {
          foreach (var entityDomainEvent in entity.DomainEvents)
              await _mediator.Publish(entityDomainEvent);
      }
  }

Then we can add it to our save overloads. At the end, we have something like this:

namespace RecipeManagement.Databases;

using Domain.Authors;
using Domain;
using Services;
using Microsoft.EntityFrameworkCore;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using Microsoft.EntityFrameworkCore.Query;

public class RecipesDbContext : DbContext
{
    private readonly IMediator _mediator;

    public RecipesDbContext(
        DbContextOptions<RecipesDbContext> options, IMediator mediator) : base(options)
    {
        _mediator = mediator;
    }

    public DbSet<Author> Authors { get; set; }

    public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new())
    {
		await _dispatchDomainEvents();
        return await base.SaveChangesAsync(cancellationToken);
    }

    public override int SaveChanges()
    {
		_dispatchDomainEvents().GetAwaiter().GetResult();
        return base.SaveChanges();
    }

    private async Task _preSaveChanges()
    {
        await _dispatchDomainEvents();
    }

    private async Task _dispatchDomainEvents()
    {
        var domainEventEntities = ChangeTracker.Entries<BaseEntity>()
            .Select(po => po.Entity)
            .Where(po => po.DomainEvents.Any())
            .ToArray();

        foreach (var entity in domainEventEntities)
        {
            foreach (var entityDomainEvent in entity.DomainEvents)
                await _mediator.Publish(entityDomainEvent);
        }
    }
}

Consuming Messages

So at this point we can capture messages and publish them for consumption, but we don't have anyone to consume them!

From here, we just need to use MediatR like we normally would. So something like this will get the job done.

namespace RecipeManagement.Domain.Authors.Features;

using System.Reflection.Metadata;
using DomainEvents;
using MediatR;

public class LogAuthor : INotificationHandler<AuthorAdded>
{
    private readonly ILogger<LogAuthor> _logger;

    public LogAuthor(ILogger<LogAuthor> logger)
    {
        _logger = logger;
    }

    public Task Handle(AuthorAdded notification, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Author added: {0}", notification.Author.Name);
				return Task.CompletedTask;
    }
}

Conclusion

And that's all it takes to set up domain events in your system!

🐦 As always, I'm available on Twitter or Discord if you have any questions or comments! Happy coding!