What is Event Sourcing and how does it relate to DDD?
Quick Answer
Event Sourcing persists state as an append-only sequence of domain events rather than just the current state; the current state is rebuilt by replaying events. It pairs well with DDD and CQRS, giving a complete audit trail, temporal queries, and natural event publication, but adds complexity around schema/versioning, replay, and snapshots. Often the write side is event-sourced while read models are projected for queries.
Detailed Answer
Event Sourcing is a pattern where the state of an application is determined by a sequence of events that have occurred, rather than by the current state alone. It's closely related to Domain-Driven Design and provides a powerful way to capture business events and maintain a complete audit trail.
Core Concepts:
- Events as Source of Truth: The event store is the primary source of truth
- Event Store: Persistent storage for all domain events
- Aggregate Reconstruction: Rebuild aggregate state by replaying events
- Event Stream: Chronological sequence of events for an aggregate
- Snapshots: Periodic snapshots to optimize reconstruction
Basic Event Sourcing Implementation:
// Base event class
public abstract class DomainEvent
{
public Guid Id { get; }
public Guid AggregateId { get; }
public int Version { get; }
public DateTime OccurredOn { get; }
public string EventType { get; }
protected DomainEvent(Guid aggregateId, int version)
{
Id = Guid.NewGuid();
AggregateId = aggregateId;
Version = version;
OccurredOn = DateTime.UtcNow;
EventType = GetType().Name;
}
}
// Order events
public class OrderCreatedEvent : DomainEvent
{
public CustomerId CustomerId { get; }
public DateTime CreatedAt { get; }
public OrderCreatedEvent(Guid aggregateId, int version, CustomerId customerId, DateTime createdAt)
: base(aggregateId, version)
{
CustomerId = customerId;
CreatedAt = createdAt;
}
}
public class OrderItemAddedEvent : DomainEvent
{
public ProductId ProductId { get; }
public Quantity Quantity { get; }
public Money UnitPrice { get; }
public OrderItemAddedEvent(Guid aggregateId, int version, ProductId productId, Quantity quantity, Money unitPrice)
: base(aggregateId, version)
{
ProductId = productId;
Quantity = quantity;
UnitPrice = unitPrice;
}
}
public class OrderConfirmedEvent : DomainEvent
{
public Money Total { get; }
public DateTime ConfirmedAt { get; }
public OrderConfirmedEvent(Guid aggregateId, int version, Money total, DateTime confirmedAt)
: base(aggregateId, version)
{
Total = total;
ConfirmedAt = confirmedAt;
}
}
public class OrderCancelledEvent : DomainEvent
{
public string Reason { get; }
public DateTime CancelledAt { get; }
public OrderCancelledEvent(Guid aggregateId, int version, string reason, DateTime cancelledAt)
: base(aggregateId, version)
{
Reason = reason;
CancelledAt = cancelledAt;
}
}
Event-Sourced Aggregate:
public class Order : AggregateRoot<OrderId>
{
public OrderId Id { get; private set; }
public CustomerId CustomerId { get; private set; }
public OrderStatus Status { get; private set; }
public Money Total { get; private set; }
public DateTime CreatedAt { get; private set; }
private readonly List<OrderItem> _items = new();
public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
// Constructor for creating new orders
public Order(OrderId id, CustomerId customerId)
{
Id = id;
CustomerId = customerId;
Status = OrderStatus.Draft;
Total = new Money(0, "USD");
CreatedAt = DateTime.UtcNow;
// Raise event
RaiseEvent(new OrderCreatedEvent(id.Value, 1, customerId, CreatedAt));
}
// Constructor for rebuilding from events
private Order()
{
// Used by event sourcing
}
public void AddItem(ProductId productId, Quantity quantity, Money unitPrice)
{
if (Status != OrderStatus.Draft)
throw new InvalidOperationException("Cannot add items to a confirmed order");
var item = new OrderItem(productId, quantity, unitPrice);
_items.Add(item);
RecalculateTotal();
// Raise event
RaiseEvent(new OrderItemAddedEvent(Id.Value, Version + 1, productId, quantity, unitPrice));
}
public void Confirm()
{
if (Status != OrderStatus.Draft)
throw new InvalidOperationException("Order is not in draft status");
if (_items.Count == 0)
throw new InvalidOperationException("Cannot confirm an empty order");
Status = OrderStatus.Confirmed;
// Raise event
RaiseEvent(new OrderConfirmedEvent(Id.Value, Version + 1, Total, DateTime.UtcNow));
}
public void Cancel(string reason)
{
if (Status == OrderStatus.Shipped)
throw new InvalidOperationException("Cannot cancel a shipped order");
Status = OrderStatus.Cancelled;
// Raise event
RaiseEvent(new OrderCancelledEvent(Id.Value, Version + 1, reason, DateTime.UtcNow));
}
private void RecalculateTotal()
{
Total = _items.Aggregate(new Money(0, "USD"), (sum, item) => sum + item.Total);
}
// Event application methods
private void Apply(OrderCreatedEvent @event)
{
Id = new OrderId(@event.AggregateId);
CustomerId = @event.CustomerId;
Status = OrderStatus.Draft;
Total = new Money(0, "USD");
CreatedAt = @event.CreatedAt;
}
private void Apply(OrderItemAddedEvent @event)
{
var item = new OrderItem(@event.ProductId, @event.Quantity, @event.UnitPrice);
_items.Add(item);
RecalculateTotal();
}
private void Apply(OrderConfirmedEvent @event)
{
Status = OrderStatus.Confirmed;
}
private void Apply(OrderCancelledEvent @event)
{
Status = OrderStatus.Cancelled;
}
}
Event Store Interface and Implementation:
public interface IEventStore
{
Task SaveEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events, int expectedVersion);
Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId);
Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId, int fromVersion);
Task<IEnumerable<DomainEvent>> GetEventsAsync(DateTime from, DateTime to);
}
public class EventStore : IEventStore
{
private readonly EventStoreDbContext _context;
private readonly IEventSerializer _eventSerializer;
public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events, int expectedVersion)
{
var eventEntities = events.Select((domainEvent, index) => new EventEntity
{
Id = domainEvent.Id,
AggregateId = aggregateId,
EventType = domainEvent.EventType,
EventData = _eventSerializer.Serialize(domainEvent),
Version = expectedVersion + index + 1,
OccurredOn = domainEvent.OccurredOn
});
_context.Events.AddRange(eventEntities);
await _context.SaveChangesAsync();
}
public async Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId)
{
var eventEntities = await _context.Events
.Where(e => e.AggregateId == aggregateId)
.OrderBy(e => e.Version)
.ToListAsync();
return eventEntities.Select(DeserializeEvent);
}
public async Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId, int fromVersion)
{
var eventEntities = await _context.Events
.Where(e => e.AggregateId == aggregateId && e.Version > fromVersion)
.OrderBy(e => e.Version)
.ToListAsync();
return eventEntities.Select(DeserializeEvent);
}
public async Task<IEnumerable<DomainEvent>> GetEventsAsync(DateTime from, DateTime to)
{
var eventEntities = await _context.Events
.Where(e => e.OccurredOn >= from && e.OccurredOn <= to)
.OrderBy(e => e.OccurredOn)
.ToListAsync();
return eventEntities.Select(DeserializeEvent);
}
private DomainEvent DeserializeEvent(EventEntity eventEntity)
{
var eventType = Type.GetType(eventEntity.EventType);
return (DomainEvent)_eventSerializer.Deserialize(eventEntity.EventData, eventType);
}
}
public class EventEntity
{
public Guid Id { get; set; }
public Guid AggregateId { get; set; }
public string EventType { get; set; }
public string EventData { get; set; }
public int Version { get; set; }
public DateTime OccurredOn { get; set; }
}
Event-Sourced Repository:
public interface IEventSourcedRepository<TAggregate> where TAggregate : AggregateRoot
{
Task<TAggregate> GetByIdAsync(Guid id);
Task SaveAsync(TAggregate aggregate);
}
public class EventSourcedOrderRepository : IEventSourcedRepository<Order>
{
private readonly IEventStore _eventStore;
private readonly ISnapshotStore _snapshotStore;
public async Task<Order> GetByIdAsync(Guid id)
{
// Try to get from snapshot first
var snapshot = await _snapshotStore.GetSnapshotAsync<Order>(id);
var fromVersion = 0;
if (snapshot != null)
{
fromVersion = snapshot.Version;
}
// Get events from the snapshot version
var events = await _eventStore.GetEventsAsync(id, fromVersion);
// Reconstruct aggregate
var order = new Order();
// Apply snapshot if available
if (snapshot != null)
{
order.RestoreFromSnapshot(snapshot);
}
// Apply events
foreach (var @event in events)
{
order.ApplyEvent(@event);
}
return order;
}
public async Task SaveAsync(Order aggregate)
{
var events = aggregate.GetUncommittedEvents();
var expectedVersion = aggregate.Version - events.Count();
await _eventStore.SaveEventsAsync(aggregate.Id.Value, events, expectedVersion);
// Create snapshot if needed
if (ShouldCreateSnapshot(aggregate))
{
var snapshot = aggregate.CreateSnapshot();
await _snapshotStore.SaveSnapshotAsync(aggregate.Id.Value, snapshot);
}
aggregate.MarkEventsAsCommitted();
}
private bool ShouldCreateSnapshot(Order aggregate)
{
// Create snapshot every 100 events
return aggregate.Version % 100 == 0;
}
}
Snapshot Implementation:
public interface ISnapshotStore
{
Task<TSnapshot> GetSnapshotAsync<TSnapshot>(Guid aggregateId) where TSnapshot : class;
Task SaveSnapshotAsync<TSnapshot>(Guid aggregateId, TSnapshot snapshot) where TSnapshot : class;
}
public class OrderSnapshot
{
public Guid AggregateId { get; set; }
public int Version { get; set; }
public CustomerId CustomerId { get; set; }
public OrderStatus Status { get; set; }
public Money Total { get; set; }
public DateTime CreatedAt { get; set; }
public List<OrderItem> Items { get; set; }
public DateTime SnapshotDate { get; set; }
}
public class Order : AggregateRoot<OrderId>
{
// ... existing code ...
public OrderSnapshot CreateSnapshot()
{
return new OrderSnapshot
{
AggregateId = Id.Value,
Version = Version,
CustomerId = CustomerId,
Status = Status,
Total = Total,
CreatedAt = CreatedAt,
Items = _items.ToList(),
SnapshotDate = DateTime.UtcNow
};
}
public void RestoreFromSnapshot(OrderSnapshot snapshot)
{
Id = new OrderId(snapshot.AggregateId);
CustomerId = snapshot.CustomerId;
Status = snapshot.Status;
Total = snapshot.Total;
CreatedAt = snapshot.CreatedAt;
_items.Clear();
_items.AddRange(snapshot.Items);
Version = snapshot.Version;
}
}
Event Sourcing with CQRS:
// Command side - uses event sourcing
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
private readonly IEventSourcedRepository<Order> _orderRepository;
public async Task Handle(CreateOrderCommand command, CancellationToken cancellationToken = default)
{
var order = new Order(new OrderId(Guid.NewGuid()), command.CustomerId);
foreach (var item in command.Items)
{
order.AddItem(
new ProductId(item.ProductId),
new Quantity(item.Quantity),
new Money(item.UnitPrice, "USD")
);
}
await _orderRepository.SaveAsync(order);
}
}
// Query side - uses read models updated by events
public class OrderReadModelUpdater : IDomainEventHandler<OrderCreatedEvent>
{
private readonly IOrderReadRepository _readRepository;
public async Task Handle(OrderCreatedEvent domainEvent, CancellationToken cancellationToken = default)
{
await _readRepository.CreateOrderAsync(new OrderReadModel
{
Id = domainEvent.AggregateId,
CustomerId = domainEvent.CustomerId.Value,
Status = "Draft",
Total = 0,
CreatedAt = domainEvent.CreatedAt
});
}
}
public class OrderItemAddedEventHandler : IDomainEventHandler<OrderItemAddedEvent>
{
private readonly IOrderReadRepository _readRepository;
public async Task Handle(OrderItemAddedEvent domainEvent, CancellationToken cancellationToken = default)
{
await _readRepository.AddOrderItemAsync(domainEvent.AggregateId, new OrderItemReadModel
{
ProductId = domainEvent.ProductId.Value,
Quantity = domainEvent.Quantity.Value,
UnitPrice = domainEvent.UnitPrice.Amount
});
}
}
Benefits of Event Sourcing:
- Complete Audit Trail: Every change is recorded as an event
- Temporal Queries: Can query the state at any point in time
- Debugging: Easy to understand what happened and when
- Compliance: Meets regulatory requirements for audit trails
- Replay Capability: Can replay events to rebuild state
- Integration: Events can be used for integration between systems
- Analytics: Rich data for business intelligence and analytics
Challenges of Event Sourcing:
- Complexity: More complex than traditional CRUD
- Event Schema Evolution: Need to handle changes to event structure
- Performance: Rebuilding aggregates from events can be slow
- Storage: Can require more storage space
- Learning Curve: Team needs to understand the pattern
When to Use Event Sourcing:
- Systems requiring complete audit trails
- Complex business domains with rich event models
- Systems where temporal queries are important
- Applications with compliance requirements
- Systems that need to replay events for analysis
- Integration scenarios where events are valuable
Best Practices:
- Event Design: Design events to be meaningful and immutable
- Snapshot Strategy: Use snapshots to optimize performance
- Event Versioning: Plan for event schema evolution
- Error Handling: Handle event processing failures gracefully
- Testing: Test event sourcing thoroughly with event replay
- Documentation: Document event schemas and processing logic
Event Sourcing is a powerful pattern that provides significant benefits for complex domains, especially when combined with DDD and CQRS. It's particularly valuable for systems that need complete audit trails and temporal querying capabilities.