Details on how to publish and consume messages using MassTransit to communicate between bounded contexts.
Wrapt projects are broken up into distinct boundaries called bounded contexts. With this kind of separation you may need to communicate between bounded contexts for whatever reason.
To do this, you can either call another boundary in process using a REST request from your feature or out of process by queuing up a message in a message broker. Generally, this would be one bounded context sending out a notification that an event occurred and one or more additional bounded contexts reacting to that notification to perform some action in response. Some examples might be:
OrderRequested
notification being sent and and getting consumed by a BillOrder
consumer as well as a ConfirmOrder
consumer. You could even take this a step further and create a saga or routing slip to orchestrate the process.Patient
record in the Ordering
bounded context and send out a PatientUpdated
notification so that the Billing
bounded context can update the associated patient information in their database as well.💡 If you're not familiar with eventing and MassTransit, I've started a blog series on the topic that I recommend you check out.
Let's look at an example like #2 above and say that I have a project built with the below domain template. Of note for this example:
Ordering
bc as well as a matching consumer in my Billing
bcAddBus
property to make sure that I am adding my event bus to each bcDomainName: VsaLab
Messages:
- Name: IPatientUpdated
Properties:
- Name: PatientId
Type: guid
BoundedContexts:
- ProjectName: Ordering
Port: 1200
DbContext:
ContextName: OrderingDbContext
DatabaseName: OrderingDatabase
Provider: Postgres
Entities:
- Name: Patient
Properties:
- Name: ExternalId
Type: string
CanFilter: true
CanSort: true
- Name: InternalId
Type: string
CanFilter: true
CanSort: true
- Name: FirstName
Type: string
CanFilter: true
CanSort: true
- Name: LastName
Type: string
CanFilter: true
CanSort: true
- Name: Dob
Type: datetimeoffset?
CanFilter: true
CanSort: true
Environment:
BrokerSettings:
Username: specialguest # optional, but here to show how you can change them if you want
Password: specialguest # optional, but here to show how you can change them if you want
Bus:
AddBus: true
Producers:
- EndpointRegistrationMethodName: PatientUpdatedEndpoint
ExchangeName: patient-updated
MessageName: IPatientUpdated
ExchangeType: fanout
ProducerName: PatientUpdated
So the idea here is that, when we update a patient in Ordering
we want to send out a notification that we did that for other bounded contexts to know about. So Craftsman scaffolded out a producer for us in
VsaLab.Ordering.src.Ordering.WebApi.Features.Producers
, but we want to emit the event when we update a patient, so let's move the event publishing to our UpdatePatient
feature. To do this, I'm going to add an IPublishEndpoint
move the publishing code to my UpdatePatient
feature, and delete the producer that we don't need in this case.
The message publishing looks like this. All I'm doing here is adding the patient id from the command to an anonymous object to use as our message and then publishing that message.
var message = new
{
request.PatientId
};
await _publishEndpoint.Publish<IPatientUpdated>(message);
When we're done, our UpdatePatient
feature will look something like this:
namespace PatientManagement.Features.Patients;
using SharedKernel.Dtos.PatientManagement.Patient;
using SharedKernel.Exceptions;
using SharedKernel.Messages;
using PatientManagement.Databases;
using AutoMapper;
using MediatR;
using MassTransit;
using Microsoft.EntityFrameworkCore;
using System.Threading;
using System.Threading.Tasks;
public static class UpdatePatient
{
public class UpdatePatientCommand : IRequest<bool>
{
public Guid PatientId { get; set; }
public PatientForUpdateDto PatientToUpdate { get; set; }
public UpdatePatientCommand(Guid patient, PatientForUpdateDto patientToUpdate)
{
PatientId = patient;
PatientToUpdate = patientToUpdate;
}
}
public class Handler : IRequestHandler<UpdatePatientCommand, bool>
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly PatientManagementDbContext _db;
private readonly IMapper _mapper;
public Handler(PatientManagementDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
_mapper = mapper;
_db = db;
}
public async Task<bool> Handle(UpdatePatientCommand request, CancellationToken cancellationToken)
{
var patientToUpdate = await _db.Patients
.FirstOrDefaultAsync(i => i.Id == request.Id, cancellationToken);
if (patientToUpdate == null)
throw new NotFoundException("Patient", request.Id);
patientToUpdate.Update(request.PatientToUpdate);
await _db.SaveChangesAsync(cancellationToken);
var message = new
{
request.PatientId
};
await _publishEndpoint.Publish<IPatientUpdated>(message);
return true;
}
}
}
Now, our consumer is already set up in our Billing
bc, so if we set both projects as startup projects and put a breakpoint in our consumer, you should see it get hit! This completes the connection to communicate between the bounded contexts.
From here, you can add whatever work needs to be done in the consumer. In this case, we are only sending the PatientId
, so you'd probably need to make a call back to Ordering
to get the entire set of patient information. Another option
could be to add all of the patient properties to the message and save yourself a call back to Ordering
for all the patient info.