Event Bus Refinements and Aggregate Principles
I made a significant mistake while working on the in-memory event bus. I added queues unnecessarily, even though there aren't distinct channels per subscriber. I'm calling subscribers one by one with the event, so a simple list of subscribers is more efficient.
Initially, I had a mutex locking the entire array of subscribers. This caused timeouts because when an event was emitted (e.g., by a saga), it would lock the subscriber array. If the saga then emitted more events, these would attempt to re-access the locked array, leading to a deadlock. Events would pile up in the channel's buffer.
The actual solution was to either remove the mutex entirely (as it wasn't truly needed) or, if dynamic subscription/unsubscription was required, implement a read-write lock. A read-write lock allows multiple parallel reads but only one write at a time, which solves the problem with the subscriber array. I've implemented a read-write lock for now, as it was easier than completely removing the existing mechanism, even though dynamic subscription isn't strictly necessary.
This solution has resolved the immediate problem. In the future, I might remove the subscriber array entirely and use a channel with multiple subscribers reacting in parallel to events, potentially each in its own task.
One crucial realization from this experience is that an aggregate should never depend on a projection. Aggregates should not read state from a projection to hydrate or create new commands, as that state might be outdated, leading to inconsistencies. Ideally, the command sent to the aggregate should contain all necessary information and rely only on the aggregate's own state, without internal links to projections.
I'm considering making command handlers synchronous to enforce this, which would effectively lock the application unless threads or other parallel processing mechanisms are used for IO. Currently, aggregate command handlers are asynchronous because I initially wanted to perform IO to call projections from them, which I now believe is a bad idea. I may event add a semaphore system in the future in epoch so this behavior is enforced.