How I’m using the Event Aggregator Pattern in StoryTeller

In my last post I did a braindump on the Event Aggregator pattern, and me being me, I was critical of the Prism approach for what I feel is needless complexity of usage.  Just to even the score and allow you to make fun of my code, here is a brief synopsis of my approach to the Event Aggregator pattern in the StoryTeller codebase.  I’m banging this out way fast, so feel free to ping me in the comments about anything that isn’t clear.  I’m going to blog tomorrow night on a small Event Aggregator strategy for client side JavaScript and if anybody wants it, I’ll also talk about writing diagnostics for the StoryTeller Event Aggregator.

 

My Event Aggregator looks like this:

    public interface IEventAggregator

    {

        // Sending messages

        void SendMessage<T>(T message);

        void SendMessage<T>() where T : new();

 

        // This method sounded cool, but has been somewhat awkward

        // in real usage

        void SendMessage<T>(Action<T> action) where T : class;

 

        // Explicit registration

        void AddListener(object listener);

        void RemoveListener(object listener);

 

        // Filtered registration, experimental

        If<T> If<T>(Func<T, bool> filter);

    }

The key points about my design is that I:

  1. Use generic marker interfaces to call and register the subscribers
  2. There is no “Event/Message” pair like Prism, just little “Message” objects
  3. All wiring is done through an IoC container (StructureMap)
  4. The subscriber registration is done by convention by my IoC container
  5. At this point I depend on explicit calls to remove listeners instead of using WeakReferences in the Event Aggregator.  I have not made up my mind how this is going to end up yet.

 

Publishers

Classes that need to publish messages simply need to get a reference to the IEventAggregator singleton.  In my world order that’s done strictly through Constructor Injection

        public TestEngine(IEventAggregator events, IUserInterfaceTestObserver observer)

Sending a message is as simple as calling the IEventAggregator.SendMessage() method:

        public virtual void Execute(Test test)

        {

            _events.SendMessage(new TestRunEvent(test, TestState.Executing));

 

            _currentTest = test;

            _engine.RunTest(test);

            _currentTest = null;

 

            _events.SendMessage(new TestRunEvent(test, TestState.NotQueued){Completed = true});

        }

 

Thread Marshalling

Internally, I let the EventAggregator class itself handle all thread marshalling between the UI thread and background threads.  Today, publishing to the EventAggregator happens on the same thread as the caller, but the Event Aggregator just assumes that messages come in on a background thread and uses SynchronizationContext to marshal the call back to the UI thread.  I register SynchronizationContext with StructureMap like this inside a StructureMap Registry:

            ForSingletonOf<SynchronizationContext>().TheDefault.Is.ConstructedBy(() =>

            {

                if (SynchronizationContext.Current == null)

                {

                    SynchronizationContext.SetSynchronizationContext(new DispatcherSynchronizationContext());

                }

 

                return SynchronizationContext.Current;

            });

The Event Aggregator itself just opens up a constructor argument for SynchronizationContext like so:

        public EventAggregator(SynchronizationContext context)

        {

            _context = context;

        }

 

Finally, when the EventAggregator class is publishing events to subscribers it uses its SynchronizationContext to marshal the callback:

        public void SendMessage<T>(Action<T> action) where T : class

        {

            sendAction(() => all().Each(x => x.CallOn(action)));

        }

 

        public void SendMessage<T>(T message)

        {

            sendAction(() => all().CallOnEach<IListener<T>>(x => x.Handle(message)));

        }

 

        protected virtual void sendAction(Action action)

        {

            // Uses SynchronizationContext to marshal the call

            // to the UI thread

            _context.Send(state => { action(); }, null);

        }

Unit testing interactions with the Event Aggregator is very simple because you only need to assert calls to SendMessage():

        [Test]

        public void send_cancellation_message_if_the_test_is_queued()

        {

            MockFor<IEventAggregator>().AssertWasCalled(

                x => x.SendMessage(new TestRunEvent(theTest, TestState.NotQueued)));

        }

 

Subscribers

As I said before, all subscribers need to implement this interface for each message type they need to receive:

    public interface IListener<T>

    {

        void Handle(T message);

    }

Which results in methods like this:

    public class FixtureExplorer : IListener<FixtureLibraryLoaded>, IStartable

    {

        public void Handle(FixtureLibraryLoaded message)

        {

            TreeNode node = BuildTree(message.Library);

            _view.ApplyFixtureNode(node);

 

            _container.Inject(message.Library);

        }

    }

Internally, the EventAggregator just stores all of the subscribers in a single List (StoryTeller isn’t big enough that I’m worried about the performance hit from scanning all subscribers).  When an event is published like this to the EventAggregator:

        public void ReloadFixtureLibrary()

        {

            _library = _proxy.BuildFixtureLibrary();

            _events.SendMessage(new FixtureLibraryLoaded(_library));

        }

The EventAggregator receives a “FixtureLibraryLoaded” messsage, then it loops through all of its subscribers and tries to case each object to IHandler<FixtureLibraryLoaded>.  If the object can be cast to that interface, EventAggregator will call the Handle(FixtureLibraryLoaded) method on the subscriber.  That code looks like this below:

        public void SendMessage<T>(T message)

        {

            // all() is just a readonly IEnumerable<object> snapshot

            // of the list of subscribers

 

            // sendAction(Action) is just making the Action run through

            // SynchronizationContext

            sendAction(() => all().CallOnEach<IListener<T>>(x => x.Handle(message)));

        }

The “CallOnEach<T>(Action<T>)” method is a small extension method I’ve started using across projects:

        public static void CallOn<T>(this object target, Action<T> action) where T : class

        {

            var subject = target as T;

            if (subject != null)

            {

                action(subject);

            }

        }

 

        public static void CallOnEach<T>(this IEnumerable enumerable, Action<T> action) where T : class

        {

            foreach (object o in enumerable)

            {

                o.CallOn(action);

            }

        }

 

 

Convention Based Registration

As I said earlier, all subscribers have to implement the IListener<T> interface to “plug” into the Event Aggregator.  I’m very aggressive about using my IoC container (StructureMap of course) to “compose” the system.  Every “service,” Presenter,  or element of the user interface that would implement the IListener<T> interface is constructed/resolved by StructureMap.  The event aggregator itself is a “managed singleton” in the container:

            For<IEventAggregator>().AsSingletons().Use<EventAggregator>();

StructureMap “knows” about the Event Aggregator and creates all the objects in the system that would implement IListener<T>.  Wouldn’t it be convenient if StructureMap could just automatically add the subscribers to the Event Aggregator as it creates the objects?  That turns out to be relatively simple to do with the “TypeInterceptor” mechanism in StructureMap.  I built a simple TypeInterceptor that automatically adds any object created by StructureMap to the Event Aggregator before the new object is returned to the caller.  This EventAggregatorInterceptor class is shown below:

    public class EventAggregatorInterceptor : TypeInterceptor

    {

        public object Process(object target, IContext context)

        {

            context.GetInstance<IEventAggregator>().AddListener(target);

            return target;

        }

 

        public bool MatchesType(Type type)

        {

            return

                type.ImplementsInterfaceTemplate(typeof (IListener<>)) ||

                type.CanBeCastTo(typeof (ICloseable));

        }

    }

 

 

 

 

 

 

 

 

 

 

“TypeInterceptor” is a StructureMap interface, and “ImplementsInterfaceTemplate()” and “CanBeCastTo()” are extension methods on the Type class provided by StructureMap as convenience methods.  Next, I need to add the EventAggregatorInterceptor to StructureMap with this little bit of code in the bootstrapper:

            RegisterInterceptor(new EventAggregatorInterceptor());

 

 

 

 

 

 

 

 

 

 

 

 

Inside StoryTeller is a Presenter class called QueuePresenter that listens for the TestRunEvent:

    public class QueuePresenter : IScreen, IListener<TestRunEvent>

 

When the user interface screen conducting code makes a call to “container.GetInstance<QueuePresenter>(), StructureMap will invoke the EventAggregatorInterceptor.Process() method and register the new instance of QueuePresenter with the Event Aggregator.

The value of the convention based registration approach is less code and fewer errors.  The registration happens automatically, so I don’t get errors from forgetting to register an event.  Classes that are strictly subscribers do not need to be coupled to the Event Aggregator at all.  When I unit test the Queue Presenter class above I can simply walk right up to the Handle(TestRunEvent):

    [TestFixture]

    public class when_a_new_test_is_queued : QueuePresenterContext

    {

        private object theItem;

 

        protected override void setUp()

        {

            MockFor<IExecutionQueue>().Expect(x => x.GetAllQueuedTests()).Return(DataMother.TestArray(3));

 

 

            theItem = new object();

            MockFor<IQueuedItemFactory>().Expect(x => x.Build(theTest)).Return(theItem);

 

            ClassUnderTest.Handle(new TestRunEvent(theTest, TestState.Queued));

        }

 

        [Test]

        public void hide_the_no_tests_label()

        {

            theView.AssertWasCalled(x => x.NoTestsAreQueued = false);

        }

 

        [Test]

        public void the_test_should_be_added_to_the_view()

        {

            theView.AssertWasCalled(x => x.AddTestItem(theItem));

        }

    }

 

 

 

 

 

 

 

 

 

 

 

 

Note that there’s no reference to Event Aggregator in the unit testing code above.

 

 

 

 

 

 

 

 

 

 

 

 

 

What about Filtering?

So far I’ve only got one place where I need to worry about event filtering and I do it inside the subscriber.  My “TestPresenter” class only cares about TestRunEvent messages that relate to a particular Test object.  In this case I’m simply filtering inside TestPresenter.Handle(TestRunEvent) like this:

        public void Handle(TestRunEvent message)

        {

            // Filter the message here and disregard messages not related to *my* Test

            if (message.Test != _test) return;

 

            // Continue on with processing

        }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

When you need event filtering, I’m finding it much easier to use coarse grained events rather than finer grained events.

 

 

 

That’s about it, there really isn’t that much to it.  Fire away with questions.  The entire code for this is in the StoryTeller codebase that’s available without a tigris login.

About Jeremy Miller

Jeremy is the Chief Software Architect at Dovetail Software, the coolest ISV in Austin. Jeremy began his IT career writing "Shadow IT" applications to automate his engineering documentation, then wandered into software development because it looked like more fun. Jeremy is the author of the open source StructureMap tool for Dependency Injection with .Net, StoryTeller for supercharged acceptance testing in .Net, and one of the principal developers behind FubuMVC. Jeremy's thoughts on all things software can be found at The Shade Tree Developer at http://codebetter.com/jeremymiller.
This entry was posted in Presentation Patterns. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • Joe

    Sorry, I’m stuck on the chicken vs. the egg concept. What causes classes that are purely listeners to be created before they’re registered as listeners (thus causing them to be registered)?  I totally understand if they’re services that get loaded for some other reason and also happen to be listeners…

  • http://twitter.com/jeremydmiller jeremydmiller

    @Joe,
    @41ec9292a8797c8d73bf9bb7ebb509fd:disqus 
    In the architecture above, *everything* was built/assembled/composed with a StructureMap IoC container.  I used a StructureMap interceptor that automatically added any object that implemented the IListener interfaces to the event aggregator whenever they were created.  No manual coding necessary. 

  • Joe

    For classes that are pure listeners and would only get loaded to handle an event, don’t you still have to push them into the aggregator by hand?  Otherwise when are they registered?

  • Jeff

    Jeremy,
    how is this different from Udi Dahan’s Domain event pattern or the upcoming Rx (System.Reactive)? Can you comment? Just seems confusing

  • http://codebetter.com/members/jmiller/default.aspx Jeremy D. Miller

    @Ryan,

    Give me a week or two and I’ll go take a closer look. I’m going to talk to the MassTransit guys too about whether a Message Bus is appropriate to use as your event aggregator.

  • http://wizardsofsmart.net/ Ryan Riley

    Any thoughts on how the Reactive Framework (IObservable/IObserver) now in Silverlight and to come in .NET 4.0 may change the pattern? Does Rx remove the need for an EventAggregator implementation?

  • http://codebetter.com/members/gblock/default.aspx Glenn Block

    @Jeremy

    It is worse in the unit-testing code, but only by a slight degree, as it means I have to have a mock EventAggregator.

    On the comment, dude, you are misinterpreting. I was not giving the customer feedback to say that OUR implementation was the best. I was actually saying that it was evidence that folks liked the use of that pattern. It was a complement on your promoting of the pattern…

    However in talking about the best, I think the best is relative, it was far better than what customers had used in the past coming out of p&p. I am the first to agree that things can always be improved, that’s the nature of developing software. And fortunately we design Prism in such a way that EventAgg was our implementation, but something we avoided any hard dependencies on.

    Glenn

  • http://codebetter.com/members/jmiller/default.aspx Jeremy D. Miller

    @Glenn,

    The bottom line for me is that the Prism EA requires me to write more code, and it’s worse in the unit testing code. More code for the same tasks == FAIL.

    “I hear lots of stories from customers who are very happy using it.”

    – because the EA in Prism is the only implementation they’ve ever been exposed to. Don’t fool yourself for an instance into believing that Prism is the best it could be just because of positive feedback.

  • http://www.codebetter.com/blogs/glenn.block Glenn Block

    Nice post Jeremy!

    I do like the IListener approach as I mentioned on Ward’s blog, and think it is quite elegant. It is a bit “magical” in that it requires something to do the autowiring, but not a problem if you are using StructureMap ;-)

    On the other hand I think there are benefits to the Prism approach. First, is it’s very explicit. It’s easy in the code to follow the liftime of the events, including putting a breakpoint where the subscription happens. Also the filtering is explicit, I can easily see at the time of subscription what filter I am subscribing with. Second, it is very similar to the pattern of event subscription in the framework. This means it’s easier for folks familar with that pattern to subscribe. Fourth, it does not rely on a magical autowirer to hook it up. It’s a very light service that can easily be passed into the constructor of any object, it does not ever require a container.

    Now as I said, I LIKE IListener . But to be fiar I don’t believe that Prism’s EA leads you to bad/unmaintainable code. Also thanks as you were definitely an inspiration for us to look into the EventAggregator pattern in Prism V1, I hear lots of stories from customers who are very happy using it.

  • Michael

    Do you have to take thread safety into account during your enumeration on the off chance that the subscriber is removed from another thread?

    Also, how does the experimental filtered operation work? Can you show the code?

    Thanks

  • Eyston

    The NDC sessions have a lot with IoC (Udi Dahan, Ayende, yours, probably others?) and it seems to be a way to play to the strengths of C#. The promise of coding a feature being limited to just … coding the feature is pretty enticing.

    I’d be interested in the Javascript examples. I’m trying to stop treating it as 2nd class code :)

  • http://phatboyg.lostechies.com/ Chris Patterson

    @David,

    They are very similar. In fact, I’m currently harvesting the internal message delivery pipeline from MassTransit into Magnum to use as a similar type of event publishing system. The end product is similar, but I’m focusing heavily on the threading aspect since I plan to use it to dispatch events between actors in-process.

    I’ll post about it soon, hopefully before the end of the weekend.

  • http://www.mohundro.com/blog/ David Mohundro

    Random question regarding the EventAggregator pattern and possibly off-topic…

    I’ve been looking at MassTransit, NServiceBus and other messaging-related frameworks and the ideas of the ServiceBus seem very close to the EventAggregator pattern. Is it a stretch to think of them as essentially the same pattern, except that the EventAggregator is just for local messages?