Eventual Consistency via Domain Events and Azure Service Bus

I’m going to start this article by explaining what Eventual Consistency is, how it compares to Transactional Consistency, and when we should consider designing a solution with this approach. Next we’re going to look at Domain Events, see what their role is in relation to eventual consistency and also how we can identify them in the domain language. Finally, based on a simple scenario, we’ll go through the implementation details of a proposed architecture, using Azure Service Bus messaging system.

Transactional vs Eventual

As the name implies it, eventual consistency is about being consistent, well, eventually. In this context eventually means at a later time. It’s easier to understand this concept when comparing it to transactional consistency, as in the following diagrams:

Transactional_Consistency

The first diagram shows a workflow that is probably very familiar to most developers. The Client (UI, API, etc.) executes a command on the system which internally runs all operations necessary to maintain the domain consistency (A and B here) inside a transaction. When the client gets the response one of the following is true:

  • Both Operation A and Operation B have succeeded
  • Both Operation A and Operation B have failed

Therefore, in transactional consistency workflows when the client gets a ‘success’ response it is guaranteed that all the necessary operations to maintain the domain consistency have been successfully executed.

It’s important to note that the ‘Domain consistency boundary’ here is not a solution component but rather a business-defined boundary. It outlines the ‘space’ within the domain where a set of business rules need to be enforced.

Eventual_Consistency

The workflow in the second diagram is probably more familiar to developers working on distributed systems. Again, the client executes a command on the system but this time only a part of all operations required to maintain the domain consistency are run (just A here) inside the transaction. Now when the client gets the response it means that one of the following is true:

  • Operation A has succeeded and Operation B is scheduled run at a later time, i.e. eventually
  • Operation A has failed (and Operation B will not run)

In other words, in eventual consistency workflows when the client gets a ‘success’ response it is only guaranteed that a part of the necessary operations to maintain the domain consistency have been successfully executed and the rest are scheduled to run a later time.

It’s fair to say that by comparison transactional consistency seems more straightforward and easier to implement. In fact it’s built-in in most frameworks. So why and when to choose eventual consistency?

Looking again at the first diagram, the following scenarios come to mind:

  1. Operation B takes a long time to execute.
    E.g. calculating a large set of values, generating a report, etc. You don’t want to let the users of your website wait more than a few seconds until they get confirmation for their actions so you need these long-running operations to run in the background in order to optimize the front-end performance.
  2. Operation B is asynchronous by nature, i.e. depends on an asynchronous mechanism.
    E.g. sending an e-mail. This is the classic example. You’re already going to inform the user that an e-mail will be sent ‘shortly’ so why not extract this in a separate component which can also be reused and scaled independently.
  3. Operation B is performed against a different aggregate than Operation A but within the same bounded context
    I will discuss about this scenario later while going through the implementation details.
  4. Operation B is performed inside a different bounded context than Operation A.
    Based on Martin Fowler’s example, consider that updating the Name of the Customer of the Sales Context will require an update to the Name of Customer in the Support Context. This won’t happen at the same time, but eventually the bounded contexts will be synchronized, thus achieving domain consistency.

Working on real-world applications scenario #1 will almost certainly force you towards eventual consistency at some point due to performance requirements. Maybe even scenario #2, although this is arguably partly a design decision because the response times of the APIs initiating the asynchronous mechanisms might be fast enough to favour the transactional route.

Reasons #3 and #4 are DDD specific and mainly about making an informed design decision. Most likely you can get away with it going the transactional route in the beginning but choosing eventual consistency in these scenarios will lead to a better design. It promotes smaller, low coupled components which perform specific operations. This will make the system easier to extend and scale, and will by default lead to improved performance because as the application grows you’re very likely to encounter scenario #1 at some stage.

Domain Events

Looking at the second diagram again it’s not clear how ‘Operation B’ is triggered. This is where domain events come into the picture. Consider the updated diagram:

EventualConsistency_with_DomainEvents2_
 
This provides a little more detail and we can see that a domain event is published soon after ‘Operation A’ is run and this happens within the boundaries of an aggregate which is encapsulated by a transaction. The domain event is then saved in a queue and later consumed by a process who needs it to run ‘Operation B’.

Therefore one of the roles of domain events is to facilitate eventual consistency. In this context they act as triggers and domain information containers.

Domain events were not formally defined as a DDD pattern when Eric Evans book was first released. The concept was introduced later and defined as:

A domain event is a full-fledged part of the domain model, a representation of something that happened in the domain. Ignore irrelevant domain activity while making explicit the events that the domain experts want to track or be notified of, or which are associated with state change in the other model objects.

Domain events can also be used to reconstruct a specific state of an aggregate when Event Sourcing is employed. In this particular context every client command will create a Domain Event.

This is not the case for ‘traditional’ approaches where only ‘meaningful’ domain events are captured as part of the domain model. So what is ‘meaningful’? One way to think about it is: “If I ignore the occurrence of event X are the domain business rules still going to be consistent?”. If the answer is No then you probably need to model it as a domain event.

As a tip, when trying to identify domain events, Vaughn Vernon also suggest to pay attention to the following key phrases when talking to business experts:

  • “When…”
  • “If that happens”
  • “Inform me if…” and “Notify me if…”
  • “An occurrence of”

I should also mention that you could use domain events to synchronously update domain objects inside the same aggregate. Personally I find this to be a slightly over-engineered way to achieve transactional consistency but it is a valid approach.

Another synchronous approach is to use domain events to update another Aggregate within the same Transaction. Technically speaking this is wrong as it violates the single aggregate per transaction DDD rule but it could be used in situations when there is no messaging mechanism available. Doing this you can still get some design benefits by decoupling updates between aggregates but not the performance and scalability benefits.

The following implementation is going to focus on the ‘Eventual Consistency’ scenario which fully leverages the benefits brought by Domain Events.

A proposed design

Let’s consider the following approach (simplified version of Vaughn Vernon’s approach outlined in his excellent book Implementing DDD):Architecture
First, a domain event is created inside the aggregate boundary. Responsible for this are either Entities or Domain Services.

Then the newly created domain event is published via the Events Publisher. When this happens the existing registered subscribers, in this case the Event Storing Subscriber, will receive the event, serialize it and persist it. Everything so far happens inside the same transaction.

Next the Event Forwarder (a background process) will read events chronologically and forward them to dedicated Message Queues.

Finally various Event Consumers (background processes ) will read the events from the dedicated queues, deserialize them and run the necessary operations to achieve domain consistency.

There are few important things to mention about this design. First, you probably noticed that the events don’t go directly to the Message Queue but are first saved in the Model Store which acts as a queue for the Event Forwarder. The reason for this is based on the assumption that the Model Store and the Message Queue don’t share the same Transaction, which is true when using a SQL database and Azure Service Bus messaging system. Without this intermediary step, we could end up in the position to successfully commit a transaction in the Model Store and then fail to save the Domain Event in the Message Queue, which will leave us with an inconsistent model because the event is lost.

Can’t the same thing happen between the Event Forwarder and the Message Queue? Technically yes, but now we’re in a much better position to handle this scenario. Because the Domain Event is now stored we can first try to forward to it to the Message Queue and only if this is successful we will mark the event as ‘Forwarded’ in the Model Store.

What if the the event is forwarded to the Message Queue but the Model Store update fails? In this scenario, we can rely on two things:

  1. Message de-duplication. Messaging systems like Azure Service Bus or Rabbit MQ provide this feature out of the box. Duplicate messages are automatically removed based on a custom unique message identifier.
  2. Domain Event idempotence. This means that if the same Domain Event occurs more than once, the subsequent occurrences will not change the domain state set by the first occurrence. Having idempotent Domain Events is an ideal scenario but it requires extra work and sometimes it can lead to increased complexity and decreased performance. Therefore, carefully choosing a Messaging System that supports de-duplication is always a good idea.

Implementation

Now that we clarified how things will work conceptually let’s have a look at the code. The complete solution can be found here: https://github.com/florindpreda/EventualConsistency.DomainEvents.AzureServiceBus

The scenario chosen to demonstrate eventual consistency is the same used when I wrote about the Unit of Work pattern. In fact, I will also reuse the same Unit of Work implementation for the transactional part.

To quickly re-iterate, we have two aggregates: Products and Product Reviews. When I delete a Product the associated Product Reviews need to be deleted as well (the delete here is a ‘soft’ or ‘logical’ delete, so rather than physically removing the records from the database, we’ll just update an ‘IsDeleted’ flag instead).

We’ll start with the DomainEvent class:

public abstract class DomainEvent
{
	public DateTime OcurrendOn { get; protected set; }

	public DomainEvent()
	{
		this.OcurrendOn = DateTime.UtcNow;
	}
}

This is the base class for all future events. It’s a good idea that all events have a time-stamp of their occurrence, primarily for logging and debugging scenarios.

Next we have the specific ProductDeleted event:

public class ProductDeleted : DomainEvent
{
	public Guid ProductId { get; protected set; }		

	public ProductDeleted(Guid productId)
	{
		this.ProductId = productId;			
	}
}

Domain Events should only carry the minimum amount of information required for their consumers to run with optimal performance. For example, if an event consumer needs five IDs in order to run and just one of them is enough to find the other 4 but it requires complex and time-consuming queries, then it’s best to include all the five IDs in the event body if they are easily available when the event is published. In this scenario, in order to delete all product reviews all we need is the ProductId.

As discussed, this event will be created and published from within the Products aggregate:

public class Product : Entity
{		
	public string Name { get; private set; }

	protected Product() {}

	public Product(string name)
	{		
		this.Name = name;
	}

	public override void Delete()
	{
		base.Delete();
		DomainEvents.Publisher.Publish<ProductDeleted>(new ProductDeleted(this.Id));			
	}
}

Let’s see how the Publisher looks like:

public class DomainEventPublisher
{
	private readonly IDictionary<Type, IList<IDomainEventSubscriber>> _subscribers = new Dictionary<Type, IList<IDomainEventSubscriber>>();

	public void Publish<T>(T domainEvent) where T : DomainEvent
	{			
		var eventSubscribers = _subscribers.SelectMany(s => s.Value)
											.Where(sb => sb.SubscribedToEventType() == domainEvent.GetType()
														|| sb.SubscribedToEventType() == typeof(DomainEvent)
													);

		foreach(var eventSubscriber in eventSubscribers)
		{
			eventSubscriber.Handle(domainEvent);
		}
	}		

	public void Subscribe<TEvent>(Action<DomainEvent> handle) where TEvent : DomainEvent
	{
		var subscriber = new DomainEventSubscriber(handle, typeof(TEvent));
		Subscribe(subscriber);
	}

	public void Subscribe(IDomainEventSubscriber domainEventSubscriber)
	{
		var eventType = domainEventSubscriber.SubscribedToEventType();			
		if (_subscribers.ContainsKey(eventType))
		{
			_subscribers[eventType].Add(domainEventSubscriber);
		}
		else
		{
			_subscribers[eventType] = new List<IDomainEventSubscriber>();
			_subscribers[eventType].Add(domainEventSubscriber);
		}
	}		
}

First thing to notice is that the Publisher has a list of Subscribers grouped by the event type their are ‘listening’ to. When an event is published, we get all the registered subscribers and execute their respective handles. A Subscriber can be either registered for a specific event, like ProductDeleted, or all events. The EventStoringSubscriber is the latter as we want all events to be stored for future forwarding:

public class EventStoringSubscriber : IEventStoringSubscriber
{		
	private readonly IStoredEventRepository _storedEventRepository;
	private readonly IEventSerializer _eventSerializer;

	public EventStoringSubscriber(IStoredEventRepository storedEventRepository, IEventSerializer eventSerializer)
	{			
		_storedEventRepository = storedEventRepository;
		_eventSerializer = eventSerializer;
	}

	public void Handle(DomainEvent domainEvent)
	{
		var serializedBody = _eventSerializer.Serialize(domainEvent);
		var storedEvent = new StoredEvent(domainEvent.GetType().ToString(), domainEvent.OcurrendOn, serializedBody);
		_storedEventRepository.Add(storedEvent);
	}

	public Type SubscribedToEventType()
	{
		return typeof(DomainEvent);
	}
}

The EventStoringSubscriber serializes the event (JSON here), creates a StoredEvent and then persists it. Again, it’s important to mention that the StoredEventRepository operations are running under the same transaction Scope as the rest of repositories involved when deleting a Product. The StoredEvent is a Domain Model class which provides a common interface for storing all events in a uniform manner:

public class StoredEvent : Entity
{
	public string TypeName { get; private set; }
	public DateTime OccurredOn { get; private set; }
	public string SerializedBody { get; private set; }
	public bool IsForwarded { get; private set; }

	protected StoredEvent() {}

	public StoredEvent(string typeName, DateTime occurredOn, string serializedBody)
	{
		TypeName = typeName;
		OccurredOn = occurredOn;
		SerializedBody = serializedBody;
	}

	public void MarkAsForwarded()
	{
		IsForwarded = true;
	}
}

The last piece of the information of the publishing part is to see how and when are the subscribers registered. First it’s important to note that the DomainEventPublisher is a ‘singleton per request’, meaning we’ll use the same instance during the duration of each request. It’s the same approach used for the Unit of Work. This is the implementation using Unity IoC:

if (HttpContext.Current != null)
{
	container.RegisterType<IDatabaseContext, DatabaseContext>(new PerHttpRequestLifetimeManager());
	container.RegisterType<DomainEventPublisher, DomainEventPublisher>(new PerHttpRequestLifetimeManager());
}
else
{				
	container.RegisterType<IDatabaseContext, DatabaseContext>(new ContainerControlledLifetimeManager());
	container.RegisterType<DomainEventPublisher, DomainEventPublisher>(new ContainerControlledLifetimeManager());
}

As we’ll get a new publisher for each request, we will also have to register the subscribers at the beginning of each request:

protected void Application_BeginRequest()
{
	UnityConfig.RegisterEventsSubscribers();
}

The code above is part of Global.asax and it calls the following method which is part of the IoC configuration:

public static void RegisterEventsSubscribers()
{
	_container.Resolve<DomainEvents>();

	var eventStoringSubscriber = _container.Resolve<IEventStoringSubscriber>();			
	DomainEvents.Publisher.Subscribe(eventStoringSubscriber);
}

The DomainEvents is just a wrapper exposing the publisher via a static property. It’s implemented this way so it can be easily used inside entities without explicitly coupling them with an IDomainEventsPublisher interface:

public class DomainEvents
{
	private static DomainEventPublisher _publisher;

	public static DomainEventPublisher Publisher 
	{ 
		get 
		{ 
			if (_publisher == null)
			{
				throw new Exception("Publisher is not initialized");
			}
			return _publisher; 
		} 
	}

	public DomainEvents(DomainEventPublisher publisher)
	{
		_publisher = publisher;
	}
}

This covers the publishing part. At this point the ProductDeleted event is stored in the StoredEvents SQL table.

Next, the ‘Event Forwarder’ background worker needs to push it in a Azure Service Bus queue. The EventForwarderService class does just that:

public class EventForwarderService : IEventForwarderService
{		
	private readonly IDictionary<string, string> _eventTypeQueueMapping = new Dictionary<string, string>()
	{
		{ typeof(ProductDeleted).ToString(), "ProductDeletedQueue" },
		{ typeof(PlaceholderEvent).ToString(), "PlaceholderQueue" }
	};

	private readonly IUnitOfWork _unitOfWork;
	private readonly IStoredEventRepository _storedEventRepository;
	private readonly IMessagingService _messagingService;

	public EventForwarderService(IUnitOfWork unitOfWork, IStoredEventRepository storedEventRepository, IMessagingService messagingService)
	{
		_unitOfWork = unitOfWork;
		_storedEventRepository = storedEventRepository;
		_messagingService = messagingService;
	}

	public void ForwardEvents()
	{
		using(_unitOfWork)
		{
			var newEvents = _storedEventRepository.GetNewEvents().ToList();
			
			foreach(StoredEvent storedEvent in newEvents)
			{
				var queueName = this.GetAssociatedQueueName(storedEvent.TypeName);
				_messagingService.Send(storedEvent, queueName);

				storedEvent.MarkAsForwarded();
				_storedEventRepository.Update(storedEvent);

				_unitOfWork.Commit();
			}				
		}
	}

	private string GetAssociatedQueueName(string eventType)
	{
		var queueName = string.Empty;

		try
		{
			queueName = _eventTypeQueueMapping[eventType];
		}
		catch(KeyNotFoundException ex)
		{
			throw new ArgumentOutOfRangeException(string.Format("No mapping defined for event: {0}", eventType), ex);
		}

		return queueName;
	}
}

 

Because the class above encapsulates all the ‘forwarding’ logic it can be easily reused regardless of what type of background service is the ‘Event Forwarder’. We could effortlessly switch between a Azure Web Job, Worker Role or Windows Service. The IMessagingService interface is implemented by the AzureServiceBusQueueMessagingService class:

public class AzureServiceBusQueueMessagingService : IMessagingService
{		
	private readonly NamespaceManager _namespaceManager;
	private readonly string _connectionString;

	private readonly IEventSerializer _eventSerializer;

	public AzureServiceBusQueueMessagingService(IEventSerializer eventSerializer)
	{
		_connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
		_namespaceManager = NamespaceManager.CreateFromConnectionString(_connectionString);			
		_eventSerializer = eventSerializer;
	}

	private void InitQueue(string queueName)
	{
		if(string.IsNullOrWhiteSpace(queueName))
		{
			throw new ArgumentNullException("Queue name is empty.");
		}

		//configure queue settings
		var queueDescription = new QueueDescription(queueName);
		queueDescription.RequiresDuplicateDetection = true;
		queueDescription.DuplicateDetectionHistoryTimeWindow = TimeSpan.FromDays(7);
		queueDescription.LockDuration = TimeSpan.FromMinutes(5);
		queueDescription.EnableDeadLetteringOnMessageExpiration = true;

		//create queue if not exists
		if (!_namespaceManager.QueueExists(queueName))
		{
			_namespaceManager.CreateQueue(queueDescription);
		}
	}

	public void Send(StoredEvent storedEvent, string queueName)
	{
		this.InitQueue(queueName);

		var client = QueueClient.CreateFromConnectionString(_connectionString, queueName);			
		var brokeredMessage = this.CreateBrokeredMessage(storedEvent);

		client.Send(brokeredMessage);

		client.Close();
	}

	private BrokeredMessage CreateBrokeredMessage(StoredEvent storedEvent)
	{
		var brokeredMessage = new BrokeredMessage(storedEvent.SerializedBody);
		brokeredMessage.MessageId = storedEvent.Id.ToString();

		return brokeredMessage;
	}

	...
}

 

One important thing to notice in the InitQueue method is that we’re setting RequiresDuplicateDetection flag to true and also the DuplicateDetectionHistoryTimeWindow. The former enables duplicate detection while the latter sets how long Azure Service Bus will store the MessageId of stored messages. I’m setting it to 7 days here, which is the maximum but this time window should be carefully considered as it requires extra queue storage space.

The final piece of the puzzle is the ‘Event Consumer’. Just like the ‘Event Forwarder’ this is background process as well, but the difference is that we can have multiple consumers (one per event type). The ProductDeletedEventConsumer will only receive ProductDeleted events:

public class ProductDeletedEventConsumer : IProductDeletedEventConsumer
{
	private readonly string QUEUE_NAME = "ProductDeletedQueue";
	private readonly IMessagingService _messagingService;
	private readonly IUnitOfWork _unitOfWork;
	private readonly IProductReviewRepository _productReviewRepository;

	public ProductDeletedEventConsumer(IMessagingService messagingService, IUnitOfWork unitOfWork, IProductReviewRepository productReviewRepository)
	{
		_messagingService = messagingService;
		_unitOfWork = unitOfWork;
		_productReviewRepository = productReviewRepository;
	}

	public void ProcessNextEvent()
	{
		_messagingService.ProcessNextEvent<ProductDeleted>(pd => Process(pd), QUEUE_NAME);
	}

	private void Process(ProductDeleted productDeleted)
	{
		using(_unitOfWork)
		{
			var productReviews = _productReviewRepository.GetByProductId(productDeleted.ProductId);
			foreach (var productReview in productReviews)
			{
				productReview.Delete();
				_productReviewRepository.Update(productReview);
			}

			_unitOfWork.Commit();
		}
	}
}

Each time a new ProductDeleted occurs, the ProductDeletedEventConsumer will process it by deleting all the associated reviews. The ProcessNextEvent method of AzureServiceBusQueueMessagingService is shown below:

...continued

public void ProcessNextEvent<TEvent>(Action<TEvent> handle, string queueName) where TEvent : DomainEvent
{
	this.InitQueue(queueName);

	var client = QueueClient.CreateFromConnectionString(_connectionString, queueName);

	var brokeredMessage = client.Receive(TimeSpan.FromSeconds(5));

	if (brokeredMessage != null)
	{
		Process<TEvent>(handle, brokeredMessage);
	}
}

private void Process<TEvent>(Action<TEvent> handle, BrokeredMessage brokeredMessage) where TEvent : DomainEvent
{
	var jsonEvent = brokeredMessage.GetBody<string>();
	var productDeletedEvent = _eventSerializer.Deserialize<TEvent>(jsonEvent);

	handle(productDeletedEvent);

	try
	{				
		brokeredMessage.Complete();
	}
	catch (Exception ex)
	{
		//do something else, e.g log
		brokeredMessage.DeadLetter();//move to dead letter queue to inspect later	
	}
}

 

Running it

  1. Got to the Azure Portal and create a new Azure Service Bus namespace. You can create a free account if you don’t have one already.
  2. Download to solution from GitHub: https://github.com/florindpreda/EventualConsistency.DomainEvents.AzureServiceBus
  3. Update the service bus connection string in App.config for EvCoSample.EventForwarderWorker and EvCoSample.ProductDeletedConsumerWorker
  4. Run the web project EvCoSample.API (this will create and seed a database on LocalDB).
  5. Run the both background processes EvCoSample.EventForwarderWorker and EvCoSample.ProductDeletedConsumerWorker.
  6. Send a HTTP Delete request on the ProductController.Delete Web API method passing a ProductId to be deleted.
  7. Check the Products, ProductReviews and StoredEvents SQL tables to see if the ProductDeleted event has been successfully processed.

 

Conclusion

To summarize, we’ve seen that:

  • ‘Eventual Consistency’ is a design approach that can improve scalability and performance of an application by deferring execution of certain operations to a later time.
  • Domain Events are tactical elements in DDD and are facilitators of ‘Eventual Consistency’ acting as triggers and Domain information containers for event consumers.
  • A common pattern to achieve ‘Eventual Consistency’ is using a messaging system like Azure Service Bus to store serialized Domain Events
  • When the Model Store and the Messaging System don’t share the same Transaction Scope, the events should be initially saved in the model store and then forwarded to the messaging system. This ensures Domain Events are not lost when there is a fault.
  • To avoid processing the same Domain Event more than once, enable de-duplication on the messaging system if it’s supported and attempt to design idempotent domain events.

2 Comments

  • Reynaldo Zabala

    11th August 2016 at 10:29 pm Reply

    What about clients who won’t use the cloud, what would you recommend? We’ve looked at nservice bus and a few others in the past. YOur thoughts?

    • Florin Preda

      1st July 2017 at 2:15 pm Reply

      From personal experience I would recommend using Rabbit MQ. It’s a mature and highly scalable service bus.

Post a Comment