Introduction to the Reactive Framework Part V

In the previous post of the Introduction to the Reactive Framework series, we covered how to create new IObservable<T> instances, either from scratch or from existing sequences.  What this allowed us to do was turn an operation which was previously interactive, such as iterating over a collection, to a reactive, where I could have multiple listeners reacting to the same collection concurrently.  This has nice implications for allowing us to scale our application in interesting ways.  This time, let’s take another angle on making asynchronous programming easier.

Let’s get caught up to where we are today:

From Asynchronous To Observables

In the first post in the series, we talked about some of the problems we face with asynchronous programming today.  For example, for any given operation, how do we handle exceptions, or handle cancellation, or even manage the lifetime of resources?  Let’s take a look at the story of asynchronous programming today.  For example, let’s take the fairly straight forward approach of sending a tweet in Twitter, but with the added pressure of non-blocking calls for all IO operations.  Now we took an example which should have been straight forward, and now it looks something like this anti-pattern:

var request = WebRequest.Create("http://twitter.com/statuses/update.xml");
request.Credentials = new NetworkCredential("foo", "bar");
request.Method = "POST";

request.BeginGetRequestStream(rsAr =>
{
    var requestStream = request.EndGetRequestStream(rsAr);
    var tweet = HttpUtility.HtmlEncode("status=Hello from an async method #bacon");
    var bytes = Encoding.UTF8.GetBytes(tweet);
    requestStream.BeginWrite(bytes, 0, bytes.Length, beginWriteAr =>
    {
        requestStream.EndWrite(beginWriteAr);
        request.BeginGetResponse(responseAr =>
        {
            var response = request.EndGetResponse(responseAr);
            var responseStream = response.GetResponseStream();

            var readBuffer = new byte[response.ContentLength];
            responseStream.BeginRead(readBuffer, 0, readBuffer.Length, readAr =>
            {
                var read = responseStream.EndRead(readAr);
                var readText = Encoding.UTF8.GetString(readBuffer);
                ResultsText.Text = readText;
            }, null);
        }, null);
    }, null);
}, null);

Not only is this code an absolute mess, but it has a number of bad practices, including not handling cleanup of our resources, as well as not handling exceptions, cancellation checking, or even checking if our buffer was completely read.  Adding those constraints to the above code would make it a bit more unreadable.  The situation we run into is that programming in a non-blocking fashion has caused all sorts of heartache where we become so wrapped up in the non-blocking code that we lose sight easily of the domain logic we should have been interested in, in the first place.

I’ve covered in the past that with F# async workflows, solving this is a rather easy task and that kind of rich functionality has been unavailable to C# developers directly.  But, with the Reactive Extensions for .NET, we have the ability to also take advantage of composable asynchronous actions as well.  So, where do we get started?

Let’s take a simple example of creating a function and then invoking it asynchronously.

Func<int, int, int> add = (x, y) => x + y;

add.BeginInvoke(3, 4, ar =>
{
    var result = add.EndInvoke(ar);
    Console.WriteLine(result);
}, null);

Many asynchronous methods in .NET are written in this way that their signature has a BeginXXX and EndXXX where the XXX is the method name that is being executed asynchronously.  The Begin takes the arguments to execute the method, an AsyncCallback which is an action that takes an IAsyncResult and returns nothing, and finally object state.  The End method takes the IAsyncResult which is passed in from the AsyncCallback in order to retrieve the value of the asynchronous call.

void BeginXXX(args, AsyncCallback, state);

Value EndXXX(IAsyncResult);

Luckily, the Reactive Extensions give us a couple of ways to make this happen.  First, let’s take an existing method such as the System.IO.Stream.WriteAsync.  In this example, we’ll create an extension method which allows us to turn this asynchronous method into a IObservable<T>.

public static IObservable<Unit> WriteAsync(
    this Stream stream, 
    byte[] buffer, 
    int offset, 
    int count)
{
    return Observable.FromAsyncPattern<byte[], int, int>(
        stream.BeginWrite,
        stream.EndWrite)(buffer, offset, count);
}

Because the Write method returns void, we have issues in C# as we cannot have a generic type as being void.  To get around this, much like F#, the Reactive Extensions introduced the Unit type to express that we have no value.  Quite frankly this is the way I think C# should have been to allow for this and not need these unnecessary hacks.  Anyways, the Observable class has a method called FromAsyncPattern which takes in both the BeginXXX and EndXXX and returns a Func delegate that takes the arguments required to execute it.  In this case, the Observable.FromAsyncPattern returns a Func which takes the buffer, offset and count and returns an IObservable<Unit>. 

There is also another way in that we can take our existing add function from above and using an extension method called ToAsync, can also work the IObservable magic.   

Func<int, int, int> add = (x, y) => x + y;
Func<int, int, IObservable<int>> obvervableAdd = add.ToAsync();

IObservable<int> result = from added in obvervableAdd(3, 4)
                          select added;
IDisposable sub = result
    .SubscribeOnDispatcher()
    .Subscribe(added => Results.Text = added.ToString());

In this instance, we took our existing add method and then turned it into an asynchronous method by using the ToAsync extension method.  That returns a Func delegate which takes our two arguments and returns an IObservable<int> which contains our asynchronous result.  We can then take our new observableAdd and execute it inside of a LINQ statement, then subscribe to it on the Dispatcher thread in order to rid ourselves of synchronization context issues in WPF.

Going back to our original example here, let’s see how we might follow some of the ideas presented here into creating a way to send a tweet completely in a LINQ statement. 

var tweetObservable =
    from request in 
        CreateWebRequest(
            "http://twitter.com/statuses/update.xml", 
            userName, 
            password, 
            "POST")
    let tweet = HttpUtility.HtmlEncode("status= Hi! #bacon")
    let buffer = Encoding.UTF8.GetBytes(tweet)
    from _ in request.SetContentLength(buffer.Length)
    from requestStream in request.GetRequestStreamAsync()
    from __ in requestStream.WriteAsync(buffer, 0, buffer.Length)
    from response in request.GetResponseAsync()
    let responseStream = response.GetResponseStream()
    let reader = new StreamReader(responseStream)
    select reader.ReadToEnd();
    
var tweetsObservable = tweetObservable
    .Throttle(TimeSpan.FromMinutes(3))
    .Repeat(25);

We can create extension methods to the WebRequest for GetRequestStream and GetResponse to enable observable behavior via the Begin/End methods that the WebRequest exposes.  As well, we can extend the System.IO.Stream as well to both write and read streams asynchronously.  We have other more trivial methods such as setting the content length and creating a web request as an observable as well, and in order to do that was covered in the previous post using the Observable.Create method.  After we create this IObservable value, we can then throttle it to every three minutes and to repeat 25 times.  One concern we might have is around the resource usage and ensuring the cleanup, and we’ll cover that next time.

Conclusion

As I’ve stated before, the LiveLabs Reactive Framework gives us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators.  Still, there is much to cover in this series with the standard LINQ combinators, the monadic heritage of this solution as well as managing resources, and turning Tasks Observables.

This entry was posted in C#, Reactive Framework. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • http://judahgabriel.blogspot.com Judah Gabriel Himango

    Any chance of this working for Silverlight 4? I get a compiler error on the WriteAsync extension method saying the arguments are invalid.

  • http://codebetter.com/members/Matthew.Podwysocki/default.aspx Matthew.Podwysocki

    @markus,

    Agreed, that repeating yourself is not a fun operation especially given that the compiler can figure these things out for you. Refactoring is simple when you let the language/tooling do the work.

    Matt

  • http://codebetter.com/members/Matthew.Podwysocki/default.aspx Matthew.Podwysocki

    @Steve,

    I couldn’t disagree with you more about the usage of var. I like being productive and have spent time in other languages with a lot less ceremony (F#, Haskell, Erlang, Ruby, Python, JavaScript). I’ve found huge gains in levels with naming things correctly, in objects and members as well as instantiations do much more for code readability than duplicating yourself by telling what type your object may be.

    Very few places do I use the explicit type in my declarations and I let the compiler do the work for me. I’ll cover what/where that is in a future post.

    Matt

  • http://codebetter.com/members/Matthew.Podwysocki/default.aspx Matthew.Podwysocki

    @Peter,

    I’ll note that in the future when doing posts given introduction level posts.

  • http://conic.se markus

    I don’t agree with the above regarding the usage of ‘var’. Writing types verbosely can make code like this unreadable and takes the elegance right out of it.

  • Manish

    Thanks for doing this series. I just stumbled upon this framework while attending SoCal CodeCamp today. I am also trying to digest your Functional c# stuff. Cool way of doing things. I need to Unlearn a lot of stuff and move forward with More Functional and Declarative stuff

  • http://twitter.com/bjorg Steve Bjorg

    Ditto on the “var” keyword. While type inference is an awesome new feature. The “var” keyword itself is doing more harm than good for code maintenance.

  • Peter G

    Matthew,
    Sorry but can you stop using the var keyword in your code samples?!
    When learning a new topic it really helps to see all the types used.

    Thanks!