Introduction to the Reactive Framework Part II

In my previous post, I talked a little bit about the parts of the Reactive Framework that are coming to the .NET 4 BCL as well as their implementation in F# as part of F# First Class Events.  This time, let’s come back to the Reactive Framework itself from which the IObservable<T> and IObserver<T> originated.  As you may remember, you can play with the bits of the Reactive Framework from the Silverlight 3 Toolkit

In the first post in the series, I gave a basic introduction to the basic problems we have with asynchronous and reactive programming.  We covered some of the evolution of the .NET framework with asynchronous programming and where the Reactive Framework fits.  This time, I’ll pick up where we left off to talk about one of the important ideas behind the Reactive Framework, the duality of Enumerable and Observable.

The Duality of Enumerable  & Observable

During Erik’s appearances on Channel 9, an important, yet recurring theme was the mathematical duality of the Enumerable to the Observable.  What does that mean exactly though? 

We remember from our first post in the series where we talked about the pull (interactive) versus the push (reactive) model.  The pull model, represented by the iterator pattern of IEnumerable<T>/IEnumerator<T> states that we must explicitly call a method in order to get each item from our abstracted collection.  On the other hand, our push model, represented by the observable pattern of IObservable<T>/IObserver<T> states that we register an interest through a subscription and then items are subsequently handed to us from some abstracted collection.

First, let’s look at the iterator pattern, which is a common interactive programming pattern that’s used quite frequently inside .NET.  This consists of two interfaces, the IEnumerable<T> and the IEnumerator<T>.  Listed below are some of the highlights of these two interfaces.  I cut down what they are to their essential bits.

public interface IEnumerable<T>
{
    IEnumerator<T> GetEnumerator();
} 

public interface IEnumerator<T> : IDisposable
{
    T Current { get; }
    bool MoveNext();
}

The IEnumerable<T> interface exposes a single method which returns an IEnumerator<T> to iterate some object.  The IEnumerator<T> gives us the ability to get the current item and determine whether there are more items to iterate. 

To think about basic asynchronous programming, we must consider a basic Gang of Four Pattern, the Observer Pattern.  This pattern describes where an object called an Observable, maintains a list of its dependent Observer classes and notifies them automatically of any state changes.  In Java, this should be rather familiar territory with the java.util.Observer class and the java.util.Observable interfaces.  In the Reactive Framework, this approach is no different, as we have an IObservable<T> interface which maintains a list of dependent IObserver<T> interfaces and notifies them automatically of any state changes.  Let’s look at the signatures below:

public interface IObserver<T>
{
    void OnCompleted();
    void OnError(Exception exception);
    void OnNext(T value);
} 

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

What you may notice, and what you may have seen on some of Erik Meijer’s Channel 9 videos is that there is a duality between the Enumerable and Observable, given the signature of these interfaces.  To explain what I mean, let’s first look at the flow of the Enumerator:

image

The GetEnumerator function takes no arguments and returns the IEnumerator<T> which has a no-argument function and returns the next item in the list through MoveNext and Current.  Now, let’s look at the inverse of it. 

image

The duality of the Enumerable solution is quite simple.  We have our observer on the left hand side which has an action function which takes a T argument and returns void.  The right hand side is the notification that is sent when something happens.  This is all managed by the Observable as it maintains a list of all observers so that it can notify them. 

To make this a little more concrete, let mash these two sets of interfaces together in order to dualize.  First, let’s start with the IEnumerator<T> interface. 

public interface IEnumerator<T> : IDisposable
{
    T Current { get; } // throws exception
    bool MoveNext();
}

What we’ll notice is that we have two items the Current property with a get accessor and the MoveNext method which takes nothing and returns a boolean that indicates if it can move next.  You’ll also notice that our Current property also could throw an exception.  If we’re in the Java world, we’d explicitly mark this interface with throws Exception, but as C# doesn’t have that, this comment will do.  Now, let’s look to dualize this interface.

public interface IDualEnumerator<T>
{
    void SetCurrent(T value | Exception ex);
    void MoveNext(bool canMove);
}

At first glance, all we have to do is flip our Current property to have a setter.  Because the Current yielded a value or threw an exception, that’s the way we’ll model it for now.  Secondly, our MoveNext will now take a boolean to indicate whether we’re done.  Let’s refactor this a little bit more and break the SetCurrent into two distinct operations, one for yielding the next value, and one for throwing an exception. 

public interface IDualEnumerator<T>
{
    void Yield(T value);
    void Throw(Exception ex);
    void MoveNext(bool canMove);
}

Now that we have that under control, let’s then move onto our MoveNext method.  As you’ll notice, we’ll continue to send in a true until we’re finished pushing values.  Instead of calling MoveNext repeatedly, let’s instead call this only once when we’re finished.

public interface IDualEnumerator<T>
{
    void Yield(T value);
    void Throw(Exception ex);
    void Break();
}

What we then end up with is pretty much the signature of our IObserver<T>.  By renaming our Yield to OnNext, our Throw to OnException and Break to OnCompleted, we’re left with our original IObserver<T> interface.

public interface IObserver<T>
{   
    void OnNext(T value);
    void OnError(Exception exception);
    void OnCompleted();
} 

What about the dual of IEnumerable<T>?  Let’s first look at what that interface is.

public interface IEnumerable<T>
{
    IEnumerator<T> GetEnumerator();
} 

This interface has a single method called GetEnumerator which returns an IEnumerator<T>.  How might we dualize this interface?  Well, we already know that the dual of the IEnumerator<T> is IObservable<T>, just as well, we need a way to attach interest in a given dualized Enumerator, so we’ll create a method called Attach.  Just as well, when we attach interest, we also need a way to detach our interest for a given observer.  That might look like the following.

public interface IDualEnumerable<T>
{
    void Attach(IObserver<T> observer);
    void Detach(IObserver<T> observer);
}

If we look at the Java interfaces for Observer and Observable, these are pretty close in signature.  There ultimately is a problem with this scenario in terms of composition.  At each level, it needs to remember which things maps to which so that you could undo any of the observers if need be.  Not doing so could lead to space leaks and is generally a bad idea.  How could we solve this then?  Ultimately, we need a way to track our attachment once we call the Attach through some form of a return value.  At this point, the tracking is nothing more than needing the ability to detach our given Observer.  If you’ll notice in our IEnumerator<T> interface, it inherits the IDisposable interface which gets one thinking.  Could we instead, change our Attach method so that when we call it, we return an IDisposable instead which gives us the ability to track our subscription and be able to clean up after we’re done?  That’s exactly the approach that the IObservable<T> takes in the final version of the interface.

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

Now that we’ve proven the duality, why does it matter?  Because of this duality, the laws that apply to LINQ to Objects now can apply to LINQ to Observables (or Events).  And in fact, we can see that through the Observable class which has many of our standard LINQ combinators such as:

  • Aggregate
  • First/FirstOrDefault
  • GroupBy
  • Join
  • Last/LastOrDefault
  • Select
  • SelectMany
  • Single/SingleOrDefault
  • Skip/SkipWhile
  • Take/TakeWhile
  • Where
  • Zip

Because of this duality, we can do such simple things as fire the ProgressChanged event until the RunWorkerCompleted event happens.

var worker = new BackgroundWorker();

var percentage = from progress in worker.ProgressChangedEvent()
                           .Until(worker.RunWorkerCompletedEvent())
                 select progress.EventArgs.ProgressPercentage;

var subscription = percentage.Subscribe(
    p => Console.WriteLine("Percentage Complete: {0}", p));


That’s only scratching the surface of the power of the Reactive Framework.  We’ll go in depth in the next part of the series on the combinators.  We could go into the fact that the Enumerable solution is really just the lazy list monad and the Observable solution is really just the continuation monad, but that’s for another time as well.

Conclusion

As I’ve stated before, the LiveLabs Reactive Framework gives us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators.  Still, there is much to cover in this series with the standard LINQ combinators, the monadic heritage of this solution and more.

This entry was posted in C#, Event-based Porgramming, Reactive Framework. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • http://oyunhilelerin.net Oyun Hileleri

    Thanks for article. :)

  • http://www.ironglue.com Vlastimil Adamovsky

    Matt, you ar a star. If I were a boss, I would like you to work for me. If you were a boss, I would liek to work for you.
    Your all articles are relaly eye-openers and I learned a lot. I know, the reactive progamming has been around quite a long, and I am glad it is comming to join the mainstream….

  • http://www.manfbraun.de mabra

    Hi !

    Much thanks for this article!
    Just to understand the details, I tried to make a mini-solution with the given code, but I am failing here:

    var progress = wc.DownloadProgressChangedEvent()….
    VS:System.Net.WebClient’ does not contain a definition for ‘DownloadProgressChangedEvent’ ..

    Any idea, what I am missing??

    Thanks anyway!

    br–mabra

  • http://codebetter.com/members/Matthew.Podwysocki/default.aspx Matthew.Podwysocki

    @Martinho,

    Agreed with your analysis of Dax’s issue. Good stuff there.

    Matt

  • http://codebetter.com/members/Matthew.Podwysocki/default.aspx Matthew.Podwysocki

    @alwin

    And that’s not unintentional either for those to look the way they are. This is to help break down the walls to get us thinking about asynchronous programming in a much easier manner.

    Matt

  • http://codebetter.com/members/Matthew.Podwysocki/default.aspx Matthew.Podwysocki

    @RobertZ

    The bits I’m using aren’t quite public yet, but if you use the System.Reactive.dll from the Silverlight 3 Toolkit, you should be able to get the same results.

    Matt

  • alwin

    public interface IDualEnumerator
    {
    void Yield(T value);
    void Throw(Exception ex);
    void Break();
    }

    That looks a lot like using the yield keyword.
    yield return x;
    throw exception;
    yield break;

    When I compare the observable pattern with ‘pushing’ values using yield in C#, I can certainly understand the observable pattern.

  • RobertZ

    Thanks Matt. Could you please post a .csproj with this example?

  • http://devnonsense.blogspot.com Martinho Fernandes

    Both can have many-to-many relationships. You can certainly have more than one IEnumerator enumerating an IEnumerable. Please correct me if this is not what you meant by many-to-one and many-to-many.

    Your ILoadable idea can be implemented as an IObserver that “loads” the things. IObservable+IObserver can be seen as a generalization of that idea.

    I believe the OnError method could be removed, because it can be simply replaced by an implementation of IObservable, and then only observers interested in errors would need to subscribe to the exception part. But the current approach is ok, too. If you’re not interested in errors, you don’t do anything.

    Why would anyone be interested in errors? With the pull model, the iterator is not interested in errors from the app, but the app is interested in errors from the iterator! If something goes wrong when enumerating, the exception surfaces and you can try-catch-finally-it and do whatever. With the push model, things work asynchronously. So, if an error occurs and you need to do something about it (clean up and stuff), you cannot use try-blocks. The OnError() methods steps in to save your skin :).

    To a certain extent IList is a quasi-dual of itself already. The Get(out int, out T) method you allude is similar to what you get when you iterate the list (think of the Select() method that takes a Func). An approximate dual of Add(T) is Remove(list.Count). So, there wouldn’t be much you could do with IListDual that you can’t do with IList already… You would have a different API (and a strange one at that).

    I don’t care much about how precise the duality is (and neither does the team, notice the small deviation made to accomodate Detach). As long as it is precise enough to be useful and powerful. I can see many uses to this, some crazier than others (right now, I can’t stop thinking about a counterpart to IQueryable and Linq2Sql…)

  • Dax

    I’m wondering if given the excitement about the dual nature, we might be losing part of the intention of IObservable.

    First of all, I’m thinking there might be a bit of a mismatch in the duality anyway. Primarily, IEnumerator and IEnumerable have a many-to-one relationship, whereas IObserver and IObservable have a many-to-many relationship, right? The deal is that IEnumerator has to have intimate knowledge about the structure of its IEnumerable that the app doesn’t necessarily have; they’re tightly bound. Whereas IObserver and IObservable are completely unaware of each others’ internals.

    I think the real dual of IEnumerable would be something more like ILoadable, where you call ILoadable.GetLoader(), which then allows you to load values into an “array” or whatever, using Loader.MoveNext(bool) and Loader.SetValue(T). I certainly think the intention of IObservable is more valuable than any ILoadable concept (though ILoadable might be an interesting way to explore SQL inserts and updates), so I wonder if the focus on duality is perhaps the wrong way to go about it.

    For example, I’m not sure I get the point of the OnError method. First of all, isn’t there a duality mismatch here anyway—given that IObserver.OnNext(T) could also throw an exception, then to preserve absolute duality going the opposite direction, shouldn’t we require that OnError also be a part of IEnumerator? But beyond that, it just doesn’t seem to fit. Why would the Observable need to tell all Observers that there was some exception downstream? Just like an iterator doesn’t need to know that there’s an exception occurring in the app that uses it. So I think the focus on preserving duality really mucks up the intention of IObservable in this case.

    Also, I think if duality was that important to the concept of IObservable and IEnumerable, then the dual of (for example) IList would be even more powerful. But wouldn’t the dual of (for example) IList.Set(int, T) be IListDual.Get(out int, out T)? Is that useful in some fashion? I don’t see anything, though I’d love to hear from anyone with an idea how it could be powerful.