Reactive Extensions for .NET – Event-based Async Operations

With the many posts that I’ve done on the Reactive Extensions for both JavaScript and .NET, I’ve covered a wide variety of the basics as well as some of the deeper stuff.  This time, I’d like to get back to the basics that I do when I give a talk on Rx in person, and this time explaining some of the problems we face today with reactive programming in general, asynchronous and event-based.

Why Do We Need It?

Let’s go into an example of using the event-based asynchronous programming methods that became a bit more prevalent when .NET 2.0 came around, especially with the introduction of WCF in the .NET 3.0 timeframe.  This was an alternative to the other methods of asynchronous programming which included the Begin/End Pattern and using WaitHandles.  One distinct advantage it did have was for the ability to report progress, report incremental results or state changes that the others could not provide.  But with it, comes many downsides as well which we’ll go into and where the Reactive Extensions can help. 

Typically, we’d have some form of asynchronous method which has no return value and takes in any number of parameters, and a given user token which would allow us to correlate our token with what comes in the user state to make sure we’re looking at the right result. 

public void DoSomethingAsync(object token) 
{
    // Kick off something async
}

And we’d also have some form of EventArgs which would encompass our result and exception should one occur and optionally a cancelled flag should we need to do so and be notified that it happened.  Then we’d also have our EventHandler which is then called when our asynchronous method is complete.

public class DoSomethingCompletedEventArgs : EventArgs 
{
    public string Result { get; set; }
    public bool Cancelled { get; set; }
    public Exception Error { get; set; }
}

public event EventHandler<DoSomethingCompletedEventArgs>
    DoSomethingCompleted;

To give you an idea of some of the problems we face with this pattern, let’s go over an example using the System.Net.WebClient class and downloading a string asynchronously via the DownloadStringAsync method.  Let’s enumerate some of the challenges in the code below.

var uri = new Uri("http://search.twitter.com/search.json?q=4sq.com");
var client = new WebClient();

client.DownloadStringCompleted += (sender, args)
{
    // TODO: Get the only one we want?
    // TODO: Check for cancel?
    // TODO: Check for exception?
    // TODO: Check for result?
    
    // TODO: What about more composition?
};

client.DownloadStringAsync(uri);

// TODO: How to unsubscribe
client.DownloadStringCompleted -= ???

The first challenge is how to make sure we get the value we want.  After all, if multiple calls are made to this WebClient instance, we’ll need to correlate the user state with our token.  Next, we need to check for cancellation and do something when that happens.  After checking for cancellation, we might also want to check for exceptions and handle them appropriately as well.  And finally we get our result, now what do we do with it?  Better yet, now that I have the result, how can I compose it with another asynchronous call?  Finally, when using lambdas for subscribing to events, I can’t unsubscribe our inline lambda, so that could cause an issue as well.  Making some of those changes to handle our cases properly, we end up with code that looks like the following:

var uri = new Uri("http://search.twitter.com/search.json?q=4sq.com");
var client = new WebClient();
var token = new object();

DownloadStringCompletedEventHandler h = (sender, args)
{
    // Check if it's ours
    if (token != args.UserState) return;

    if (args.Cancelled)
    {
        // Do something to say we stopped
    }
    else if (args.Error != null)
    {
        // Do something with that error
    }
    else
    {
        // Well, now what?
    }
};

client.DownloadStringCompleted += h;
client.DownloadStringAsync(uri, token);

// When we're ready to clean up
client.DownloadStringCompleted -= h;

We solved a good majority of the problems with a lot of boilerplate.  But within all this boilerplate, we’re losing sight of our core goal here, which is to get our result and do something with it, especially when that next call happens to be to another asynchronous call.  For example, what if we want to download a string from somewhere, transform it into JSON, modify the results and then upload somewhere else?  Our code might look like the following should we use the Reactive Extensions for .NET to solve it.

var client = new WebClient();
var searchUri = new Uri("http://search.twitter.com/search.json?q=4sq.com");
var uploadUri = new Uri("http://localhost:8080/uploads");

IObservable<Unit> query =
    from result in client.DownloadStringAsObservable(searchUri, new object())
    let transformed = TransformResult(result)
    from upload in client.UploadStringAsObservable(
        uploadUri,
        "POST",
        transformed,
        new object())
    select upload;
    
var subscription = query.Subscribe(
    x => {}, // Nothing to do
    exn => 
    {
        // Do something with the exception
    }, 
    () => 
    {
        // Indicate we're finished
    });

What we’re able to do is take operations that would have required quite a bit of code in terms of boilerplate, and allow for composition through LINQ via the Reactive Extensions for .NET.  But, how do we get there? 

How Do We Get There?

What we need to do at this point is to transform our asynchronous call from above and turn it into an IObservable<T> instance.  Using the FromEvent which we’ve covered earlier, won’t cut it because ultimately, we want to combine the asynchronous call with the event handler which is invoked upon completion, which FromEvent cannot provider.  Instead, we want to focus our effort into the Observable.Create and Observable.CreateWithDisposable methods to help us.  This allows us to create IObservable<T> instances from the subscribe implementation.  We can either choose the CreateWithDisposable method which gives us an IObserver<T> instance and we return a concrete IDisposable instance, or we call Create in which are given the same IObserver<T> instance, but we return an Action instead of the IDisposable instance.  Let’s look at those signatures below:

public static IObservable<TSource> CreateWithDisposable<TSource>(
    Func<IObserver<TSource>, IDisposable> subscribe
)

public static IObservable<TSource> Create<TSource>(
    Func<IObserver<TSource>, Action> subscribe
)

Using the Observable.Create method, let’s create a method called DownloadStringAsObservable extension method to handle the asynchronous request.  Inside the Observable.Create, we’ll create our DownloadStringCompletedEventHandler where we can check for our user token, handle cancellation by calling observer.OnCompleted, handle exceptions via observer.OnError and finally handle our result by calling observer.OnNext and observer.OnCompleted to indicate we are finished.  We then take this handler and listen to the DownloadStringCompleted event.  We want to be able to catch any errors in our DownloadStringAsync should it throw an exception, so we put that in a try/catch block and send the Exception to observer.OnError.  Finally, Being the good citizens we are, we want to clean up after ourselves by removing our handler from the DownloadStringCompleted event when we dispose of our subscription.

public static IObservable<string> DownloadStringAsObservable(
    this WebClient client, 
    Uri address,
    object userToken)
{
    return Observable.Create<string>(observer =>
    {
        DownloadStringCompletedEventHandler handler = (sender, args) =>
        {
            if (args.UserState != userToken) return;
    
            if (args.Cancelled)
                observer.OnCompleted();
            else if(args.Error != null)
                observer.OnError(args.Error);
            else
            {
                observer.OnNext(args.Result);
                observer.OnCompleted();
            }
        };
    
        client.DownloadStringCompleted += handler;
        
        try
        {
            client.DownloadStringAsync(address, token);
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
        
        return () => client.DownloadStringCompleted -= handler;
    });
}

The sample logic applies to uploading a string as well by wrapping the UploadStringAsync method as we do below.

public static IObservable<Unit> UploadStringAsObservable(
    this WebClient client, 
    Uri address, 
    string method, 
    string data,
    object userToken)
{
    return Observable.Create<Unit>(observer =>
    {
        UploadStringCompletedEventHandler handler = (sender, args) =>
        {
            if (args.UserState != userToken) return;
    
            if (args.Cancelled)
                observer.OnCompleted();
            else if (args.Error != null)
                observer.OnError(args.Error);
            else
            {
                observer.OnNext(new Unit());
                observer.OnCompleted();
            }
        };
    
        client.UploadStringCompleted += handler;
        
        try
        {
            client.UploadStringAsync(address, data, method, token);
        }
        catch(Exception ex)
        {
            observer.OnError(ex);
        }
        
        return () => client.UploadStringCompleted -= handler;
    });          
}

By using these tokens as we do above, we could easily wrap the DownloadProgressChanged event as well filtering for our token as well.

public static IObservable<DownloadProgressChangedEventArgs> 
    DownloadProgressChangedAsObservable(
        this WebClient client,
        object userToken)
{
    return Observable.FromEvent<DownloadProgressChangedEventArgs>(
            client,
            "DownloadProgressChanged")
        .Where(x => x.EventArgs.UserState == userToken)
        .Select(x => x.EventArgs)
}

What we have done here is taken a less than optimal event-based asynchronous programming model and moved it to use Reactive Extensions for .NET so that we can take care of uniform cancellation checking, error handling and allow for composition.

Conclusion

Asynchronous programming is something that’s creeping up more and more to .NET developers, especially with Silverlight, Windows Phone and so forth.  Unfortunately, dealing with asynchronous programming has been and continues to be hard.  By using the Reactive Extensions for .NET, we have ways to build bridges from the old asynchronous programming models, to a more composable form where we can uniformly handle exceptions, cancellation and so forth.

So with that, download it, and give the team feedback!

This entry was posted in C#, Event-based Porgramming, Reactive Framework. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • Prajapati KV

    Great! Thanks for this very informative yet simple article.

  • Chris Martin

    Once the “ah-hah” moment hits you, IObserv(able/er) is such a great addition to the framework.

    Thanks for all these articles.