What is Event Sourcing and how does it relate to DDD?

9 minadvancedevent-sourcingCQRSDDDarchitecture

Quick Answer

Event Sourcing persists state as an append-only sequence of domain events rather than just the current state; the current state is rebuilt by replaying events. It pairs well with DDD and CQRS, giving a complete audit trail, temporal queries, and natural event publication, but adds complexity around schema/versioning, replay, and snapshots. Often the write side is event-sourced while read models are projected for queries.

Detailed Answer

Event Sourcing is a pattern where the state of an application is determined by a sequence of events that have occurred, rather than by the current state alone. It's closely related to Domain-Driven Design and provides a powerful way to capture business events and maintain a complete audit trail.

Core Concepts:

  1. Events as Source of Truth: The event store is the primary source of truth
  2. Event Store: Persistent storage for all domain events
  3. Aggregate Reconstruction: Rebuild aggregate state by replaying events
  4. Event Stream: Chronological sequence of events for an aggregate
  5. Snapshots: Periodic snapshots to optimize reconstruction

Basic Event Sourcing Implementation:

// Base event class
public abstract class DomainEvent
{
    public Guid Id { get; }
    public Guid AggregateId { get; }
    public int Version { get; }
    public DateTime OccurredOn { get; }
    public string EventType { get; }
    
    protected DomainEvent(Guid aggregateId, int version)
    {
        Id = Guid.NewGuid();
        AggregateId = aggregateId;
        Version = version;
        OccurredOn = DateTime.UtcNow;
        EventType = GetType().Name;
    }
}

// Order events
public class OrderCreatedEvent : DomainEvent
{
    public CustomerId CustomerId { get; }
    public DateTime CreatedAt { get; }
    
    public OrderCreatedEvent(Guid aggregateId, int version, CustomerId customerId, DateTime createdAt)
        : base(aggregateId, version)
    {
        CustomerId = customerId;
        CreatedAt = createdAt;
    }
}

public class OrderItemAddedEvent : DomainEvent
{
    public ProductId ProductId { get; }
    public Quantity Quantity { get; }
    public Money UnitPrice { get; }
    
    public OrderItemAddedEvent(Guid aggregateId, int version, ProductId productId, Quantity quantity, Money unitPrice)
        : base(aggregateId, version)
    {
        ProductId = productId;
        Quantity = quantity;
        UnitPrice = unitPrice;
    }
}

public class OrderConfirmedEvent : DomainEvent
{
    public Money Total { get; }
    public DateTime ConfirmedAt { get; }
    
    public OrderConfirmedEvent(Guid aggregateId, int version, Money total, DateTime confirmedAt)
        : base(aggregateId, version)
    {
        Total = total;
        ConfirmedAt = confirmedAt;
    }
}

public class OrderCancelledEvent : DomainEvent
{
    public string Reason { get; }
    public DateTime CancelledAt { get; }
    
    public OrderCancelledEvent(Guid aggregateId, int version, string reason, DateTime cancelledAt)
        : base(aggregateId, version)
    {
        Reason = reason;
        CancelledAt = cancelledAt;
    }
}

Event-Sourced Aggregate:

public class Order : AggregateRoot<OrderId>
{
    public OrderId Id { get; private set; }
    public CustomerId CustomerId { get; private set; }
    public OrderStatus Status { get; private set; }
    public Money Total { get; private set; }
    public DateTime CreatedAt { get; private set; }
    
    private readonly List<OrderItem> _items = new();
    public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
    
    // Constructor for creating new orders
    public Order(OrderId id, CustomerId customerId)
    {
        Id = id;
        CustomerId = customerId;
        Status = OrderStatus.Draft;
        Total = new Money(0, "USD");
        CreatedAt = DateTime.UtcNow;
        
        // Raise event
        RaiseEvent(new OrderCreatedEvent(id.Value, 1, customerId, CreatedAt));
    }
    
    // Constructor for rebuilding from events
    private Order()
    {
        // Used by event sourcing
    }
    
    public void AddItem(ProductId productId, Quantity quantity, Money unitPrice)
    {
        if (Status != OrderStatus.Draft)
            throw new InvalidOperationException("Cannot add items to a confirmed order");
            
        var item = new OrderItem(productId, quantity, unitPrice);
        _items.Add(item);
        RecalculateTotal();
        
        // Raise event
        RaiseEvent(new OrderItemAddedEvent(Id.Value, Version + 1, productId, quantity, unitPrice));
    }
    
    public void Confirm()
    {
        if (Status != OrderStatus.Draft)
            throw new InvalidOperationException("Order is not in draft status");
            
        if (_items.Count == 0)
            throw new InvalidOperationException("Cannot confirm an empty order");
            
        Status = OrderStatus.Confirmed;
        
        // Raise event
        RaiseEvent(new OrderConfirmedEvent(Id.Value, Version + 1, Total, DateTime.UtcNow));
    }
    
    public void Cancel(string reason)
    {
        if (Status == OrderStatus.Shipped)
            throw new InvalidOperationException("Cannot cancel a shipped order");
            
        Status = OrderStatus.Cancelled;
        
        // Raise event
        RaiseEvent(new OrderCancelledEvent(Id.Value, Version + 1, reason, DateTime.UtcNow));
    }
    
    private void RecalculateTotal()
    {
        Total = _items.Aggregate(new Money(0, "USD"), (sum, item) => sum + item.Total);
    }
    
    // Event application methods
    private void Apply(OrderCreatedEvent @event)
    {
        Id = new OrderId(@event.AggregateId);
        CustomerId = @event.CustomerId;
        Status = OrderStatus.Draft;
        Total = new Money(0, "USD");
        CreatedAt = @event.CreatedAt;
    }
    
    private void Apply(OrderItemAddedEvent @event)
    {
        var item = new OrderItem(@event.ProductId, @event.Quantity, @event.UnitPrice);
        _items.Add(item);
        RecalculateTotal();
    }
    
    private void Apply(OrderConfirmedEvent @event)
    {
        Status = OrderStatus.Confirmed;
    }
    
    private void Apply(OrderCancelledEvent @event)
    {
        Status = OrderStatus.Cancelled;
    }
}

Event Store Interface and Implementation:

public interface IEventStore
{
    Task SaveEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events, int expectedVersion);
    Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId);
    Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId, int fromVersion);
    Task<IEnumerable<DomainEvent>> GetEventsAsync(DateTime from, DateTime to);
}

public class EventStore : IEventStore
{
    private readonly EventStoreDbContext _context;
    private readonly IEventSerializer _eventSerializer;
    
    public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events, int expectedVersion)
    {
        var eventEntities = events.Select((domainEvent, index) => new EventEntity
        {
            Id = domainEvent.Id,
            AggregateId = aggregateId,
            EventType = domainEvent.EventType,
            EventData = _eventSerializer.Serialize(domainEvent),
            Version = expectedVersion + index + 1,
            OccurredOn = domainEvent.OccurredOn
        });
        
        _context.Events.AddRange(eventEntities);
        await _context.SaveChangesAsync();
    }
    
    public async Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId)
    {
        var eventEntities = await _context.Events
            .Where(e => e.AggregateId == aggregateId)
            .OrderBy(e => e.Version)
            .ToListAsync();
            
        return eventEntities.Select(DeserializeEvent);
    }
    
    public async Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId, int fromVersion)
    {
        var eventEntities = await _context.Events
            .Where(e => e.AggregateId == aggregateId && e.Version > fromVersion)
            .OrderBy(e => e.Version)
            .ToListAsync();
            
        return eventEntities.Select(DeserializeEvent);
    }
    
    public async Task<IEnumerable<DomainEvent>> GetEventsAsync(DateTime from, DateTime to)
    {
        var eventEntities = await _context.Events
            .Where(e => e.OccurredOn >= from && e.OccurredOn <= to)
            .OrderBy(e => e.OccurredOn)
            .ToListAsync();
            
        return eventEntities.Select(DeserializeEvent);
    }
    
    private DomainEvent DeserializeEvent(EventEntity eventEntity)
    {
        var eventType = Type.GetType(eventEntity.EventType);
        return (DomainEvent)_eventSerializer.Deserialize(eventEntity.EventData, eventType);
    }
}

public class EventEntity
{
    public Guid Id { get; set; }
    public Guid AggregateId { get; set; }
    public string EventType { get; set; }
    public string EventData { get; set; }
    public int Version { get; set; }
    public DateTime OccurredOn { get; set; }
}

Event-Sourced Repository:

public interface IEventSourcedRepository<TAggregate> where TAggregate : AggregateRoot
{
    Task<TAggregate> GetByIdAsync(Guid id);
    Task SaveAsync(TAggregate aggregate);
}

public class EventSourcedOrderRepository : IEventSourcedRepository<Order>
{
    private readonly IEventStore _eventStore;
    private readonly ISnapshotStore _snapshotStore;
    
    public async Task<Order> GetByIdAsync(Guid id)
    {
        // Try to get from snapshot first
        var snapshot = await _snapshotStore.GetSnapshotAsync<Order>(id);
        var fromVersion = 0;
        
        if (snapshot != null)
        {
            fromVersion = snapshot.Version;
        }
        
        // Get events from the snapshot version
        var events = await _eventStore.GetEventsAsync(id, fromVersion);
        
        // Reconstruct aggregate
        var order = new Order();
        
        // Apply snapshot if available
        if (snapshot != null)
        {
            order.RestoreFromSnapshot(snapshot);
        }
        
        // Apply events
        foreach (var @event in events)
        {
            order.ApplyEvent(@event);
        }
        
        return order;
    }
    
    public async Task SaveAsync(Order aggregate)
    {
        var events = aggregate.GetUncommittedEvents();
        var expectedVersion = aggregate.Version - events.Count();
        
        await _eventStore.SaveEventsAsync(aggregate.Id.Value, events, expectedVersion);
        
        // Create snapshot if needed
        if (ShouldCreateSnapshot(aggregate))
        {
            var snapshot = aggregate.CreateSnapshot();
            await _snapshotStore.SaveSnapshotAsync(aggregate.Id.Value, snapshot);
        }
        
        aggregate.MarkEventsAsCommitted();
    }
    
    private bool ShouldCreateSnapshot(Order aggregate)
    {
        // Create snapshot every 100 events
        return aggregate.Version % 100 == 0;
    }
}

Snapshot Implementation:

public interface ISnapshotStore
{
    Task<TSnapshot> GetSnapshotAsync<TSnapshot>(Guid aggregateId) where TSnapshot : class;
    Task SaveSnapshotAsync<TSnapshot>(Guid aggregateId, TSnapshot snapshot) where TSnapshot : class;
}

public class OrderSnapshot
{
    public Guid AggregateId { get; set; }
    public int Version { get; set; }
    public CustomerId CustomerId { get; set; }
    public OrderStatus Status { get; set; }
    public Money Total { get; set; }
    public DateTime CreatedAt { get; set; }
    public List<OrderItem> Items { get; set; }
    public DateTime SnapshotDate { get; set; }
}

public class Order : AggregateRoot<OrderId>
{
    // ... existing code ...
    
    public OrderSnapshot CreateSnapshot()
    {
        return new OrderSnapshot
        {
            AggregateId = Id.Value,
            Version = Version,
            CustomerId = CustomerId,
            Status = Status,
            Total = Total,
            CreatedAt = CreatedAt,
            Items = _items.ToList(),
            SnapshotDate = DateTime.UtcNow
        };
    }
    
    public void RestoreFromSnapshot(OrderSnapshot snapshot)
    {
        Id = new OrderId(snapshot.AggregateId);
        CustomerId = snapshot.CustomerId;
        Status = snapshot.Status;
        Total = snapshot.Total;
        CreatedAt = snapshot.CreatedAt;
        _items.Clear();
        _items.AddRange(snapshot.Items);
        Version = snapshot.Version;
    }
}

Event Sourcing with CQRS:

// Command side - uses event sourcing
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
    private readonly IEventSourcedRepository<Order> _orderRepository;
    
    public async Task Handle(CreateOrderCommand command, CancellationToken cancellationToken = default)
    {
        var order = new Order(new OrderId(Guid.NewGuid()), command.CustomerId);
        
        foreach (var item in command.Items)
        {
            order.AddItem(
                new ProductId(item.ProductId),
                new Quantity(item.Quantity),
                new Money(item.UnitPrice, "USD")
            );
        }
        
        await _orderRepository.SaveAsync(order);
    }
}

// Query side - uses read models updated by events
public class OrderReadModelUpdater : IDomainEventHandler<OrderCreatedEvent>
{
    private readonly IOrderReadRepository _readRepository;
    
    public async Task Handle(OrderCreatedEvent domainEvent, CancellationToken cancellationToken = default)
    {
        await _readRepository.CreateOrderAsync(new OrderReadModel
        {
            Id = domainEvent.AggregateId,
            CustomerId = domainEvent.CustomerId.Value,
            Status = "Draft",
            Total = 0,
            CreatedAt = domainEvent.CreatedAt
        });
    }
}

public class OrderItemAddedEventHandler : IDomainEventHandler<OrderItemAddedEvent>
{
    private readonly IOrderReadRepository _readRepository;
    
    public async Task Handle(OrderItemAddedEvent domainEvent, CancellationToken cancellationToken = default)
    {
        await _readRepository.AddOrderItemAsync(domainEvent.AggregateId, new OrderItemReadModel
        {
            ProductId = domainEvent.ProductId.Value,
            Quantity = domainEvent.Quantity.Value,
            UnitPrice = domainEvent.UnitPrice.Amount
        });
    }
}

Benefits of Event Sourcing:

  1. Complete Audit Trail: Every change is recorded as an event
  2. Temporal Queries: Can query the state at any point in time
  3. Debugging: Easy to understand what happened and when
  4. Compliance: Meets regulatory requirements for audit trails
  5. Replay Capability: Can replay events to rebuild state
  6. Integration: Events can be used for integration between systems
  7. Analytics: Rich data for business intelligence and analytics

Challenges of Event Sourcing:

  1. Complexity: More complex than traditional CRUD
  2. Event Schema Evolution: Need to handle changes to event structure
  3. Performance: Rebuilding aggregates from events can be slow
  4. Storage: Can require more storage space
  5. Learning Curve: Team needs to understand the pattern

When to Use Event Sourcing:

  • Systems requiring complete audit trails
  • Complex business domains with rich event models
  • Systems where temporal queries are important
  • Applications with compliance requirements
  • Systems that need to replay events for analysis
  • Integration scenarios where events are valuable

Best Practices:

  1. Event Design: Design events to be meaningful and immutable
  2. Snapshot Strategy: Use snapshots to optimize performance
  3. Event Versioning: Plan for event schema evolution
  4. Error Handling: Handle event processing failures gracefully
  5. Testing: Test event sourcing thoroughly with event replay
  6. Documentation: Document event schemas and processing logic

Event Sourcing is a powerful pattern that provides significant benefits for complex domains, especially when combined with DDD and CQRS. It's particularly valuable for systems that need complete audit trails and temporal querying capabilities.

Related Resources