In my expeditions through the treacherous mountains of CQRS and Event Sourcing, I've never been exactly comfortable with the examples I've seen of domain Aggregates. They have always seemed like an annoying blister that irritates me repeatedly while I'm hiking through an implementation.

I finally had enough and decided to really examine what the problem was, and figure out, how could I tackle it once and for all and find my blister treatment.

I am a lucky person in that I can file something away in my brain to run in a background process and leave it alone until it lets me know it's solved. This requires lots of time doing something not related to the thing I've filed away, and lately I've had lots of that time available.


To get everyone up to speed, here's a little terminology:

  • CQRS is Command and Query Responsibility Segregation, the concept of completely independent code deployments for process that affects change in a system from process that requests state information. There is loads of info available online, especially from Greg Young.
  • Event Sourcing is an extension to CQRS that stores the entire state of a system in a record of events, so a new copy of a system can be instantiated by simply replaying the history into an application state. Again loads of info out there.
  • Domain Aggregates are those classes that process Commands into Events that are then stored.

Defining the Problem

I had to identify my actual issues with the examples I was seeing. I could only see examples where the Aggregate class was an abstract that needed extending by a domain specific class. That class would then add domain specific functionality to create a state that Commands would update. This just felt wrong.

After a little background processing, the alarm was going off that I had the problem sorted needed to play with some code to solve the problem.

The problem was that an Aggregate had two specific areas of concern: Domain specific functionality and the processing of events from some form of persistence. Now to me this didn't seem like the responsibility of an Aggregate and it's implementations. The examples I was seeing were using an Aggregate as a bucket of basic functionality for extension rather than as a type to be implemented specifically. It seemed to me a possible breach of the single responsibility principle and/or (depending on how you look at it) the Liskov substitution principle. Either way it was a bad idea.

So where to go from here? As the title says: Observers.

The Observer Pattern

The observer pattern is pretty simple. This really good writeup explains it well, but in an overly simplistic view, it allows classes to register with other classes so messages can be sent between them when one changes state.

For our use case example, here's the lowest level code:

interface Context {
    public function __get(string $name);
}

interface Observer
{
    public function apply(Context $context): void;
}

interface Observable
{
    public function addObserver(Observer $observer): void;
    public function removeObserver(Observer $observer): void;
    public function notifyObservers(Context $context): void;
}

trait ObservableBehaviour
{
    private Map $observers;

    public function initialiseObservable(): void
    {
        $this->observers = new Map;
    }

    public function addObserver(Observer $observer): void
    {
        $this->observers->put(get_class($observer), $observer);
    }

    public function removeObserver(Observer $observer): void
    {
        $this->observers->remove(get_class($observer));
    }

    public function notifyObservers(Context $context): void
    {
        foreach ($this->observers as $observer) {
            $observer->apply($context);
        }
    }
}

Using the above pattern of interfaces and the helper trait, we can split domain specific behaviour from aggregate behaviour quite easily. The magic is in that we make an Aggregate class Observable and an Observer, and the same for our domain specific classes.

Let's go through how this looks in operation. We start with the Aggregate class:

class Aggregate implements Observable, Observer
{
    use ObservableBehaviour;

    private UniqueId $aggregateId;
    private int $baseVersion;
    private Events $events;

    public static function hydrate(
    	UniqueId $aggregateId,
        Observer $observer,
        Events $events
    ): self {
        $aggregate = new static($aggregateId);
        $aggregate->addObserver($observer);

        foreach ($events as $event) {
            $aggregate->baseVersion = $event->version();
            $aggregate->notifyObservers($event->context());
        }

        $aggregate->removeObserver($observer);

        return $aggregate;
    }

    private function __construct(UniqueId $aggregateId)
    {
        $this->initialiseObservable();

        $this->aggregateId = $aggregateId;
        $this->baseVersion = 0;
        $this->events = new Events;
    }

    public function aggregateId(): UniqueId
    {
        return $this->aggregateId;
    }

    public function events(): Events
    {
        return $this->events;
    }

    public function apply(Context $context): void
    {
        $this->events->add(new Event(
            $this->aggregateId(),
            $context,
            $this->nextVersion())
        );
    }

    private function nextVersion(): int
    {
        return $this->baseVersion + count($this->events) + 1;
    }
}

Let's look at that in more detail:

    public static function hydrate(
    	UniqueId $aggregateId,
        Observer $observer,
        Events $events
    ): self {
        $aggregate = new static($aggregateId);
        $aggregate->addObserver($observer);

        foreach ($events as $event) {
            $aggregate->baseVersion = $event->version();
            $aggregate->notifyObservers($event->context());
        }

        $aggregate->removeObserver($observer);

        return $aggregate;
    }

    private function __construct(UniqueId $aggregateId)
    {
        $this->initialiseObservable();

        $this->aggregateId = $aggregateId;
        $this->baseVersion = 0;
        $this->events = new Events;
    }

The Aggregate can be hydrated from an event history using the hydrate method, which as you see accepts an Observer instance which in our case would be our domain class (more on this later). We construct the instance which initialises the Observable behaviour and sets some class properties.

The baseVersion concept is all about event concurrency and is a standard procedure in most Event Sourcing libraries out there.

The key thing to note is that the event context, or the meat of the event, is passed to the Observer for it to update its internal state.

The hydration process registers and then later removes the Observer, as it would only need to function in this specific static call, and it's always good to clean up after yourself.

    public function apply(Context $context): void
    {
        $this->events->add(new Event(
            $this->aggregateId(),
            $context,
            $this->nextVersion()
        ));
    }

The apply method fulfils the Observer part of the requirement, and adds any new events to the Aggregate to be stored later. That new event is recording state change from the domain class that the Aggregate will be observing.

The other methods are helpers and accessors to pull this all together in a wider event persistence platform.

So what would our domain classes look like?

class MyContextObject implements Context
{
    private string $someValue;
    private string $anotherValue;
    
    public function __construct(string $someValue, string $anotherValue)
    {
        $this->someValue = $someValue;
        $this->anotherValue = $anotherValue;
    }
    
    public function __get(string $name)
    {
        try {
            return $this->name;
        } catch (Throwable $e) {
            return;
        }
    }
}

class MyDomainConcept implements Observer, Observable
{
    use ObservableBehaviour;

    private string $someValue;
    private string $anotherValue;

    public function __construct()
    {
        $this->initialiseObservable();
        $this->addObserver($this);
    }

    public function myStateChangeFunctionality(string $someValue, string $anotherValue)
    {
        $context = new MyContextObject($someValue, $anotherValue);

        if ($someValue !== $this->someValue || $anotherValue !== $this->anotherValue) {
            $this->notifyObservers($context);
        }
    }

    public function apply(Context $context): void
    {
        $methodName = 'apply' . (new ReflectionClass($context))->getShortName();

        if (method_exists($this, $methodName)) {
            $this->{$methodName}($context);
        }
    }

    private function applyMyContextObject(MyContextObject $context): void
    {
        $this->someValue = $context->someValue;
        $this->anotherValue = $context->anotherValue;
    }
}

We have two classes here, a Context class and our Aggregate real, which performs the domain specific functionality we require to manage the current state of the Aggregate. Let's break things down again...

    public function __construct()
    {
        $this->initialiseObservable();
        $this->addObserver($this);
    }

Notice how we initialise the Observable behaviour again when it is constructed, and how we register itself as an Observer? This is a magic trick to make our state change functions super simple.

    public function myStateChangeFunctionality(string $someValue, string $anotherValue)
    {
        $context = new MyContextObject($someValue, $anotherValue);

        if ($someValue !== $this->someValue || $anotherValue !== $this->anotherValue) {
            $this->notifyObservers($context);
        }
    }

The myStateChangeFunctionality method only has to do some basic boolean tests on values, and then notifyObservers (from our helper trait) if all is good. The magic from before of making the Observer an Observer of itself? That allows us to utilise the apply method the same way the Aggregate class would to update the local state.

    public function apply(Context $context): void
    {
        $methodName = 'apply' . (new ReflectionClass($context))->getShortName();

        if (method_exists($this, $methodName)) {
            $this->{$methodName}($context);
        }
    }

    private function applyMyContextObject(MyContextObject $context): void
    {
        $this->someValue = $context->someValue;
        $this->anotherValue = $context->anotherValue;
    }

Our apply method is then able to do what it wants to update the internal state. A method I like to adopt is to have specific action methods for each type of Context to make the code clean and easy to reason about, so I use the little trick of Reflection to create a method call. By not tying the expensive ReflectionClass instance to a variable, the memory should be dumped pretty efficiently in PHP, but the same concepts apply in other OOP languages too.


The really cool part here is that it is now super easy to break down our 'Domain Aggregates' into smaller classes of specific functionality. Say you have a domain concept that has a lot of state; break it up into smaller chunks on a per use case basis. There's nothing stopping you from having multiple small Observers for state, all using the same Aggregate. I'll leave it up to you to figure out how to hydrate multiple Observers from one Aggregate (cough cough Decorators).


So what do you think of this? We have very specific classes, one which is totally reusable as a component in a larger CQRS and Event Sourcing pattern, and then specific classes per application use case to do the heavy lifting. Each side of the dance is completely testable without the other (as long as test doubles are used to spy on the process, that may come up in a later blog post) and the whole thing doesn't sting like a blister.

I hope this helps you to consider ways to refactor your code and break down the responsibilities. If you have any ideas or want to discuss the patterns (or if I missed something), leave a comment below or hit me up on Twitter and let's have a conversation.

Happy trekking!