Sponsored By Aspose - File Format APIs for .NET

Aspose are the market leader of .NET APIs for file business formats – natively work with DOCX, XLSX, PPT, PDF, MSG, MPP, images formats and many more!

Introduction to the Reactive Extensions for JavaScript – Error Handling Part I

We’ve covered a bit recently with conditional and looping operators on the Reactive Extensions for JavaScript, but I want to step back just a minute and cover exception handling.  This post will cover how we can compensate for errors as they happen in several ways and will largely follow Bart de Smet’s post on the same topic, but instead of covering the Interactive Extensions, we’ll stick primarily in JavaScript.

Before we get started, let’s get caught up to where we are today:

 

Error Handling

When writing safe programs, we have to be mindful of exception handling.  Typically for imperative code we have try/catch/finally blocks which allow us to try a certain behavior, compensate for exceptions and do any cleanup that is necessary.

try {
    // do something
}
catch(exception) {
    // handle error
}
finally {
    // clean up
}

When it comes to asynchronous programming, the job becomes a bit harder.  How do we reliably do exception handling that doesn’t litter our codebase during each and every callback?

doAsyncAction1(data, function(result1) {
    // Handle exception here?
    
    doAsyncAction2(result1, function(result2) {
        // What about here?
        
        // What about more callbacks?
    });
});

Alternatively, some might have separate callback handlers for exceptions which further muddies the waters:

doAsyncAction1(data, 
    function(result1) {
    
        doAsyncAction2(result1, 
            function(result2) {
    
            },
            function(error2) {
                // handle this error
            });
    },
    function(error1) {
        // Handle it way down here
    });

With the Reactive Extensions for JavaScript, we have some mechanisms at our disposal for error handling and compensation.  Say for example, we have a safeDivide method which instead of producing infinity as the result of a divide by zero, instead we throw an exception.

function safeDivide(dividend, divisor) {
    var result = dividend / divisor;

    if (!isFinite(result)) {
        throw "Cannot divide by zero";
    }

    return result;
}

How can we use the Reactive Extensions to compensate for any such exceptions that happen?  Using our knowledge from previous posts, we’ll not only handle the OnNext part of our Observer, but also the OnError as well.  Let’s walk through a simple example of using our safeDivide.

var array1 = [3, 2, 1, 0];

Rx.Observable.FromArray(array1)
    .Select(
        function (next) {
            return safeDivide(100, next);
        })
    .Subscribe(
        // OnNext
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        },
        
        // OnError
        function (err) {
            $("<p/>").html("Error: " + err).appendTo("#results");
        });

From this, we’ll get the following results in our web page:

33.333333333333336
50
100
Error: Cannot divide by zero

When the Select method above hits the condition where the exception is thrown, our observable sequence calls OnError handler on our observer and then immediately quits.  In this post, we’ll go over some of the compensation techniques outside our this standard OnError handler we have above.

Before we do so, let’s cover one more operator that will help us later on in the post, which is the ability to have the observable sequence throw an exception via the Throw function.  The Throw operator creates an observable sequence, when subscribed to, will call OnError with the given exception.

// exception : The exception to throw
// returns : Observable

Rx.Observable.Throw = function(
    exception);

We can see this in action by simply passing in a message such as the following:

Rx.Observable.Throw("oh noes!").Subscribe(
    function() { },
    function(msg) { alert(msg); }); // Shows oh noes!

Having seen that, let’s move onto our first compensation…

Catching the Exception and Moving On

The first operator we’ll cover is the Catch operator.  This operator allows us to continue an observable sequence that is terminated by an exception with the next observable sequence.  We have two overloads of the Catch as defined below with one being an instance method on the observable sequence and the other is a static method which takes an array of observable sequences and an optional scheduler.

// obs : varargs array of observable sequences
// returns : Observable

Rx.Observable.prototype.Catch = function(
    obs1, obs2, obs3, ...);

// items : Observable[]
// scheduler : an optional scheduler
// returns : Observable

Rx.Observable.Catch = function(
    items,
    scheduler);

The first version is much like an imperative version of a try/catch block which allows us to compensate and compose our workflows.  Now, let’s say you had an observable sequence that can cause the OnError to be fired like the following code will:

var couldThrow = Rx.Observable.FromArray([1, 2])
 	.Concat(Rx.Observable.Throw("oh noes!"))

What we can do to compensate is use the Catch function to continue on with the next sequence so that we end up with the numbers 1-5 printed on the screen where the last three values were from the compensation sequence.

couldThrow.Catch(Rx.Observable.FromArray([3, 4, 5]))
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

Just as well, we could just chain together sequences with compensations.  Below, I have an example of chaining together some values, exceptions and compensation values which will yield me once again the sequence of 1 through 5.

 var couldThrow = Rx.Observable.FromArray([1, 2])
    .Concat(Rx.Observable.Throw("oh noes!"))
    .Catch(Rx.Observable.FromArray([3, 4]))
    .Concat(Rx.Observable.Throw("not again!"))
    .Catch(Rx.Observable.Return(5));

couldThrow.Subscribe(
    function (next) {
        $("<p/>").html(next).appendTo("#results");
    });

The second version of the Catch operator allows us to have an array of observable sequences which allows us to compensate for sequences until no more errors occur.  Let’s take a look at an example of using this below.  What will be printed on the screen?

var exn = Rx.Observable.Throw("yup yup!");
var couldThrow = Rx.Observable.Catch([
    Rx.Observable.FromArray([1,2]).Concat(exn),
    Rx.Observable.FromArray([3,4]).Concat(exn),
    Rx.Observable.Return(5),
    Rx.Observable.Return(6)]);

couldThrow.Subscribe(
    function (next) {
        $("<p/>").html(next).appendTo("#results");
    });

If you guessed once again, the numbers 1 through 5, you would be correct.  Why?  The answer is simply that once 5 was yielded to us, there was no compensation needed, so 6 was not yielded.  Had the sequence which yielded 5 also thrown an exception, then I would have yielded 6.

And Finally…

Now that we’ve covered the Catch part of the equation, how about cleaning up resources at the end of our workflow?  How can the Reactive Extensions help us here?  Through the use of the Finally operator, we’re able to do just that.  The signature that follows shows that this takes a function with no parameters and has no return value. 

// finallyAction : () -> ()
// returns : Observable

Rx.Observable.prototype.Finally = function(
    finallyAction);

Let’s look at a regular workflow where we start with an array of 1 and 2.  Then we’ll add a Finally function to indicate that we are complete.  This will print out 1, 2 and then Finished!

Rx.Observable.FromArray([1, 2])
    .Finally(
        function() {
            $("<p/>").html("Finished!").appendTo("#results");
        })
     .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

How about we mix this up a little bit and show a bit more advanced scenario?  Let’s walk through the code in pseudocode before we begin.  The idea is that we yield the first two numbers and then have an exception thrown.  We then will print out “Finished”, followed by the outer catch yielding the next three values. 

try {
    try {
        yield 1;
        yield 2;
        throw "y'arr!";
    } 
    finally {
        print "Finished!";
    }
}
catch(exception) {
    yield 3;
    yield 4;
    yield 5;
}

The overall result should yield us: 1, 2, Finished, 3, 4, 5.  Let’s look at how that might be done using the Reactive Extensions for JavaScript.

Rx.Observable.FromArray([1, 2])
    .Concat(Rx.Observable.Throw("y'arr!"))
    .Finally(
        function() {
            $("<p/>").html("Finished!").appendTo("#results");
        })
    .Catch(Rx.Observable.FromArray([3,4,5]))
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

There’s more to cover here including more cleanup routines and compensation that we’ll get to in the next post including Using, OnErrorResumeNext and Retry.

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.

When we start creating more advanced workflows through the Reactive Extensions, we also need ways of handling errors as well.  We have the ability do handle errors and compensate as they happen through a rich set of operators including Catch, Finally, Using, OnErrorResumeNext, Retry and more.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, JavaScript, JSConf, Reactive Framework | Leave a comment

Reactive Extensions for JavaScript (and .NET) Hands on Labs Now Available

One of the biggest desires of the community around the Reactive Extensions for JavaScript (and .NET as well) has been a guide on how to get started (well that and complete documentation).  With that, the Reactive Extensions team and I have created hands on labs for both JavaScript and .NET to get you started incrementally with the Reactive Extensions.  Both labs largely follow each other in terms of format and what areas are covered.  For the JavaScript lab we’ve created, a little bit of jQuery knowledge around selectors and AJAX support are desired.

The JavaScript lab has the following sections:

  • Exercise 1: Getting Started
  • Exercise 2: Creating Observable Sequences
  • Exercise 3: Importing DOM Events into Rx
  • Exercise 4: A First Look at the Standard Query Operators
  • Exercise 5: More Operators to Tame the User Input
  • Exercise 6: Bridging the Callback Method Pattern in Rx
  • Exercise 7: SelectMany – the Zen of Composition
  • Exercise 8: Testability

Hope you find these useful and give us feedback on what we did right and wrong and what you’d like to see from us in the future!

Posted in Event-based Porgramming, JavaScript, JSConf, Reactive Framework | 4 Comments

Introduction to the Reactive Extensions for JavaScript – Looping Operators

After spending the past couple of posts talking about how we integrate and why, let’s get back to the basic operators of the Reactive Extensions for JavaScript.  After covering the conditional operators of If and Case, let’s go deep into looping constructs.  The Reactive Extensions gives us two options for dealing with looping constructs, a For and a While loop which do pretty much what you think they do.  Let’s get started by covering the For loop.

Also, before we get started on the usual links, do check out Jeffrey Van Gogh’s performance (with me in the background) at JSConf on the B Track talking about the Reactive Extensions for JavaScript.

Before we get started, let’s get caught up to where we are today:

Looping with Observable.For

The first looping construct we’re going to look at is the For loop.  What this does is that it takes an array, runs through the result selector on that value and then concatenates the observable sequences obtained.  You can think of this as a standard for loop, but it concatenates the results together into a single observable sequence.  Below is the signature for this method:

// source : Array<Source>
// selector : Source -> Observable<Result>

// returns : Observable<Result>

Rx.Observable.For(
    source,
    selector);

We can show how this works through a quick example of the For in action by starting out with a simple array and then appending a value to the front of it.  Note that the selector function takes in the current value and must return an observable value, hence my use of the Rx.Observable.Return function.

var obj = [1,2,3];

Rx.Observable.For(
    obj,
    function(item) {
        return Rx.Observable.Return("foo" + item);
    })
    .Subscribe(
        function(result) {
            $("<p/>").html(result).appendTo("#content");
        });

In this case, we’ll get the following result on our screen:

foo1
foo2
foo3

You may notice here that the overall semantics of this method is similar to calling FromArray and then SelectMany on a given array.  Now that we have an idea of how we can use this, let’s see this in a more detailed fashion.  What we’re going to do is query Twitter for the usual term, then iterate through each result, and if there is a geo record, then we’ll call the Bing Geolocation REST service to get the best guess at the actual address.  As you can see below, we’re going to reuse a little bit of our knowledge from the previous post to use the Rx.Observable.If where we check whether we have a geo record and if we do, then we query the service, else we return an empty observable.  And once again with our If function, we need to ensure since JavaScript is an eager language that we defer the hot observable that is an AJAX call until the condition calls for it.

searchTwitter("4sq.com").SelectMany(
    function(data) {
    
        return Rx.Observable.For(
            data.results,
            function(result) {
            
                return Rx.Observable.If(
                    function() { return result.geo != null; },
                    Rx.Observable.Defer(
                        function() { 
                            var lat = result.geo.coordinates[0];
                            var lon = result.geo.coordinates[1];
                            return geolocate(lat, lon);
                        }),                    
                    Rx.Observable.Empty(Rx.Scheduler.Timeout));
            });
    })
    .Subscribe(
        function(location) {
            var address = location.resourceSets[0].resources[0].name;
            $("<p/>").html(address).appendTo("#content");
        });

And just for full disclosure, here are the searchTwitter and geolocate functions defined:

function searchTwitter(term) {
    return $.ajaxAsObservable(
        { url : "http://search.twitter.com/search.json",
          dataType : "jsonp",
          data : { q : term, rpp : 100 } })
        .Select(function(d) { return d.data; });
}

function geolocate(lat, lon) {
    var key = "get your own key";
    var latlng = lat+","+lon;

    return $.ajaxAsObservable(
        { url : "http://dev.virtualearth.net/REST/v1/Locations/"+latlng,
          dataType : "jsonp",
          jsonp : "jsonp",
          data : { key : key, o : "json" } })
        .Select(function(d) { return d.data; });
}

Now that we’ve thoroughly looked at For loops, let’s move on to the While loop.

Looping with Observable.While

The next looping statement we’re going to talk about is the While loop.  This allows us to repeat our given source until the condition no longer holds (returns false).  Let’s take a look at the method signature for this:

// condition : () -> bool
// source : Observable<Source>

// returns : Observable<Source>

Rx.Observable.While(
    condition,
    source);

Let’s walk through a simple example of using this where we repeat a given source until our condition no longer holds.  In this instance we’ll check if our x variable is less than 10, and if so, keep repeating our observable sequence of 1.  The result will be writing 5 1s on the screen in addition to the x value.

var x = 0;

Rx.Observable.While(
    function() { return x < 5; },
    Rx.Observable.Return(1))
    .Subscribe(
        function(value) {
            x++;
            $("<p/>").html(value+"-"+x).appendTo("#content");
        });

And our result should look like the following:

1-1
1-2
1-3
1-4
1-5

And there you have it, two ways of dealing with looping, either with For loops or with While loops.  But, before we wrap this up, I’ll get back to those .NET folks.

Back to .NET

Once again, I didn’t forget about the .NET world in this post.  We can use pretty much the same syntax as we did for our JavaScript version in .NET.  Let’s look first at the For method signature, which should look identical to the JavaScript version if you squint enough.  This takes in an IEnumerable<T> as the source and then a Func<T, IObservable<TResult>> as the result selector and returns to use an IObservable<TResult>

public static IObservable<TResult> For<TSource, TResult>(
    IEnumerable<TSource> source,
    Func<TSource, IObservable<TResult>> resultSelector
)

And once again like above, we can do some simple iteration over a simple range, and then square each value as it comes through.

IEnumerable<int> range = Enumerable.Range(1, 10);

Observable.For(
    range, 
    x => Observable.Return(x * x))
    .Subscribe(x => Console.WriteLine(x));

This yields to us the following result:

1
4
9
16
25
36
49
64
81
100

Like the For method above, the While method follows exactly its JavaScript counterpart in terms of syntax once you squint again.  This method takes a Func<bool> condition delegate and a source to repeat while the condition holds.  This returns to us an IObservable<TSource> back to us as the result.

public static IObservable<TSource> While<TSource>(
    Func<bool> condition,
    IObservable<TSource> source
)

And we can run through the same While example as above transcribed from JavaScript to C#.

int timesCalled = 0;

Observable.While(
    () => timesCalled < 5,
    Observable.Return(1))
    .Subscribe(x =>
    {
        timesCalled++;
        Console.WriteLine(x + "-" + timesCalled);
    });

With these two posts I’ve covered some really interesting imperative constructs that can integrate nicely with the Reactive Extensions.  Covering these is leading me towards my next post, which is to cover why these were added in the first place…

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.

When we start creating more advanced workflows through the Reactive Extensions, we need powerful ways of expressing such concepts as looping logic outside of the Skip/While, Take/While and so forth.  With the addition of the For and While operators, we have even more ways to describe our workflows.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, JavaScript, JSConf, Reactive Framework | Leave a comment

Introduction to the Reactive Extensions for JavaScript – Conditionals

After spending the past couple of posts talking about how we integrate and why, let’s get back to the basic operators of the Reactive Extensions for JavaScript.  This time, we’ll cover conditional logic that we can do with RxJS (which also applies to Rx.NET as well).  With traditional LINQ, we have the Where operator which allows us to filter operations, but it doesn’t allow us to do one operation or another easily.  Instead, the Reactive Extensions team has included two operators, If and Case which allow us some flexibility on how we want to execute conditional logic.

Before we get started, let’s get caught up to where we are today:

Conditional Branching with Observable.If

The first conditional operator we’ll cover is the Observable.If function.  This function acts as a normal if statement would in that it allows us to specify a conditional function and then have a branch for the “then” logic and another branch for the “else” logic.  The signature of the function is below:

// condition  : () -> bool
// thenSource : Observable
// elseSource : Observable

// returns : Observable

Rx.Observable.If(
    condition,
    thenSource,
    elseSource);

For example, I could determine whether I want to use the cached value of an observable or start a fresh request to get some data based upon some setting.

var cachedObservable = new Rx.Subject();

...

Rx.Observable.If(
    function() { return shouldCache; },
    cachedObservable
    Rx.Observable.Defer(
        function() {
            return $.getJSONAsObservable("someurl");
        })
    );

You’ll notice that I deferred the getJSONAsObservable and there’s a very good reason for that.  Such AJAX calls as this one are hot observables which mean that they fire immediately instead of waiting for the Subscribe call being made.  We’ll get into the whole hot versus cold observables in another post.

So, you might be thinking, what’s the difference then between this and if I had written it the old fashioned way using an if statement?

var cachedObservable = new Rx.Subject();

...

if (shouldCache) {
    return cachedObservable;
}
else {
    return Rx.Observable.Defer(
        function() {
            return $.getJSONAsObservable("someurl");
        });
}

Well, the difference between the two is that the Rx.Observable.If function will evaluate the condition on subscription, whereas the if statement will evaluate eagerly for example on a hot observable.  The if statement is nice, but what if we have more than just a simple binary case?

Conditional Branching with Observable.Case

Another mechanism we have for branching logic is be being able to handle a switch statement.  What we want to give is the ability to write your traditional switch statement, but have it lazily execute just as we have the If function above.  We can take the following as an example where based upon n, return the appropriate Observable value.

switch(n)
{
    case 1:
        return Rx.Observable.Return(1);
    case 2:
        return Rx.Observable.Return(2);
    default:
        return Rx.Observable.Return(3);
}        

In order to make this happen, we need to use the Rx.Observable.Case function which allows us to specify a condition, such as checking for the value of n, a hash object which contains key/value pairs with the key and their Observable value, and finally an optional default case.

// selector : () -> Key
// sources  : Dictionary<Key, Observable>
// defaultSource : Observable

// returns : Observable

Rx.Observable.Case(
    selector,
    sources,
    defaultSource);

In the case of our above switch statement, we’ll first want to create the hash of our sources.  Basically, the property name is the key and its value is the value of the key/value pair.

var sources = {
    1 : Rx.Observable.Return(1),
    2 : Rx.Observable.Return(2)
};

Now that we have the sources, let’s finish off the implementation.  First, for our selector, we’ll need to provide a function which takes no parameters and returns our desired key.  Next, we’ll provide the sources from above, and finally we’ll provide a default case if no others match.

var cased = Rx.Observable.Case(
    function() { return n; },
    sources,
    Rx.Observable.Return(3));

So, in the case of n being 1, then the first source will be selected, and so on for those along the line.  If n does not match either 1 nor 2, the defaultSource will be selected.  Let’s run through a quick example where we check if the detected language is a language we support, either French or Spanish, and if we do, then we translate.  Else, we will go ahead and throw an exception via the Rx.Observable.Throw function as part of the workflow to say that the given language isn’t supported.

// The variables could be something like:
// var detected = "fr";
// var text = "Nous allons traduire un texte";

var sources = {
    es : Rx.Observable.Defer(
            function() { return translateFromSpanish(text);
         }),
    fr : Rx.Observable.Defer(
            function() { return translateFromFrench(text);
         })
};

Rx.Observable.Case(
        function() { return detected; },
        sources,
        Rx.Observable.Throw("Not supported!"))
    .Subscribe(
        function(data) {
            alert(data);
        },
        function(err) {
            alert("An error occurred: " + err);
        });

And there you have it, you now have two ways of dealing with lazy conditional logic within your observable workflow.

Back to .NET

Even though my examples here are primarily in JavaScript, we have the exact same functionality in the Reactive Extensions for .NET and Silverlight.  For example, the If function is implemented as well with the signature as follows:

public static IObservable<TResult> If<TResult>(
    Func<bool> condition,
    IObservable<TResult> thenSource,
    IObservable<TResult> elseSource
)

And a simple example of using this, such as an even/odd scenario:

Observable.If<string>(
    () => n % 2 == 0,
    Observable.Return("even"),
    Observable.Return("odd"));

Just as well, we can use the Case statement, which has the following signature:

public static IObservable<TResult> Case<TValue, TResult>(
    Func<TValue> selector,
    IDictionary<TValue, IObservable<TResult>> sources,
    IObservable<TResult> defaultSource
)

And then we could walk through a simple example of creating our cases and then call the case with our variable n which could be either our 1, 2 or anything else.

var sources = new Dictionary<int, IObservable<int>>
{
    { 1, Observable.Return(1) },
    { 2, Observable.Return(2) }
};

var cased = Observable.Case<int>(
    () => n,
    sources,
    Observable.Return(3));

As with most examples I give on this blog, most of the core operators work nicely when going back and forth from RxJS to RxNET.

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways. 

When we start creating more advanced workflows through the Reactive Extensions, we need powerful ways of expressing such concepts as conditional logic outside of the Where function we’re already accustomed to.  With the If and Case functions, we have two powerful pieces at our disposal to make those more complicated pieces a reality.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, Functional Programming, JavaScript, JSConf, Reactive Framework | Leave a comment

Introduction to the Reactive Extensions for JavaScript – MooTools Integration

In the previous post, I covered a bit about how we’ve integrated the Dojo Toolkit into the Reactive Extensions for JavaScript (RxJS) where we can leverage Dojo’s eventing and AJAX programming model.  Following onto that post, I wanted to move onto a different framework and show how we integrated it into RxJS.  This time, it’s MooTools up to the plate.

Now the question comes, why then if the given framework works as well as it does, why would we care to integrate it into RxJS?  What does it buy us?  What if we want to combine two mouse moves together, one offset from the other so that we get a delta from the previous?  Or how about throttling the values coming from our events, either the first value only or a delay between each value?  Between all the callbacks, setTimeouts and the global state that you need to track, it becomes very hand to manage.  That’s the reason why we’ve created this layer where we can go seamlessly from MooTools and other JavaScript frameworks to RxJS.  We can either hop out of the world of our framework and into RxJS, or integrate it underneath the covers to manage the composable nature of your framework.  Both are great options…

Before we get started, let’s get caught up to where we are today:

Diving Into MooTools Events

Once again the first part of our journey is to cover the eventing model of MooTools.  MooTools has the ability through any number of classes to be able to listen to events.  We’ll start with the Element class method addEvent, which attaches an event listener to a given DOM element. 

someElement.addEvent(
    type, 
    fn);

Each parameter is described below:

  • type (string) – the name of the event to monitor such as click, mousemove, etc.
  • fn (function) – the callback function to execute when the event is fired.

The fn callback as noted above gives us a Event which has any number of properties including page and client position, keys used, the target and related targets as well as some methods to stop propagation. 

Conversely to our addEvent, we can remove our handler from the Element by using the removeEvent method.

someElement.removeEvent(
    type, 
    fn);

Each parameter is described below:

  • type (string) – the name of the event to remove
  • fn (function) – the callback function to remove

We can then listen to events in MooTools such as a keyup event on a given input element.

$("someInput").addEvent("keyup", 
    function(event) {
        $("whatkey").set("html", key);
    });

We can wrap these events by using the Rx.Observable.Create function which hands us an observer to use and then we return a disposable which allows us to remove our listener.  Inside the Create function, we’ll create a handler which takes the event object from MooTools and passes it to the observer.OnNext.  After we’ve defined the handler, we call addEvent on our MooTools Element with the event type and we’ll pass in the handler as the method to invoke.  Finally, we return a function which is invoked when we call Dispose on our subscription.  Below is a wrapped version called Rx.Observable.FromMooToolsEvent.

Rx.Observable.FromMooToolsEvent = function(obj, eventType) {
    return observable.Create(function(observer) {
    
        // Create handler for OnNext
        var handler = function(eventObject) {
            observer.OnNext(eventObject);
        };
        
        // Subscribe to event
        obj.addEvent(eventType, handler);
        
        // Return function used to remove event handler
        return function() {
            obj.removeEvent(eventType, handler);
        };
    });
};

What this allows is for us to use a MooTools element to attach an event much like the following:

var element = $("someElement");

Rx.Observable.FromMooToolsEvent(element, "mousemove")
    .Where(
        function(event) {
            return event.client.x === event.client.y;
        })
    .Subscribe(
        function(event) {
            $("mousePosX").set("html", event.client.x);
        });

That’s all well and good, but I think it’s better to live and breathe within MooTools itself.  In order to do so, we need to extend a few objects that expose adding and removing event handlers, such as Element, Elements, Window, Document as well as any other class which inherits from the Event class.  To do that, we’ll use the Class method implement in which we pass in properties which alters the base class so that we can modify existing classes.  For example, I can extend the Element and all other classes mentioned above by doing the following:

var fromMooToolsEvent = function(type) {
    return fromMooToolsEvent(this, type);
}

Window.implement({
    addEventAsObservable : fromMooToolsEvent
});

Document.implement({
    addEventAsObservable : fromMooToolsEvent
});

Element.implement({
    addEventAsObservable : fromMooToolsEvent
});

Elements.implement({
    addEventAsObservable : fromMooToolsEvent
});    

Events.implement({
    addEventAsObservable : fromMooToolsEvent
});  

By doing this, we can take the above example of hooking a mouse move listener to report the x position relative to the viewport when both the x and y values are the same:

$("someElement").addEventAsObservable("mousemove")
    .Where(
        function(event) {
            return event.client.x === event.client.y;
        })
    .Subscribe(
        function(event) {
            $("mousePosX").set("html", event.client.x);
        });

Now that we have the basics down here, what about the AJAX APIs?

Diving into the Request

Another area where the Reactive Extensions for JavaScript can integrate well is with AJAX APIs.  MooTools exposes three such classes in the Core library, Request, Request.HTML and Request.JSON.  Each of those do as you suspect with one handling general requests such as text, whereas the HTML and JSON classes handle their respective data types.  But the one I’m going to focus on is in the More MooTools library called Request.JSONP, which allows me to make cross-domain JSON calls.

new Request.JSONP(options);

The documentation for options is as follows:

  • url – (url) the url to get the JSON data
  • callbackKey – (string) the key in the url that the server uses to wrap the JSON results. So, for example, if you used callbackKey: ‘callback’ then the server is expecting something like http://…./?q=search+term&callback=myFunction; defaults to "callback". This must be defined correctly.
  • data – (object) additional key/value data to append to the url
  • retries – (integer; defaults to zero) if this value is a positive integer, the JSONP request will abort after the duration specified in the timeout option and fire again until the number of retries has been exhausted.
  • timeout – (integer; defaults to zero) the duration to wait before aborting a request or retrying.
  • injectScript – (mixed; defaults to document head) where to inject the script elements used for the calls
  • log – (boolean) if true, sends logging messages to Log (to console if it’s defined). Defaults to false.

For example, should we want to search on Twitter once again, we can pass in our URL, the data for the query string, the callback and a function used for the callback.  We can then create the Request.JSONP instance with our options and send our request.  

var options = {
    url: 'http://search.twitter.com/search.json',
    data: { rpp: "100", q: "4sq.com" }
    callbackKey: "callback" 
    onComplete: 
        function(data) {
            $("results").set("html", data.results[0].text);
        }
};
var req = new Request.JSONP(options);
req.send();

We could also cancel our request as well should we need to by using the cancel function.  Once again, the issue arises of composability as well as error handling.  How could we integrate with key up events and throttle our request?  Sooner or later when we start composing things together via callbacks, our code could walk right off the screen.  Instead, RxJS could allow us some flexibility to compose events and AJAX calls together.  To make this happen, we can wrap the API once again like we did with Dojo and jQuery, but this time we’ll take a different approach.

As before, we’ll create an AsyncSubject, which is used for asynchronous communication that happens once and then caches the value.  Next, we need to handle both the success and error conditions by adding the functions load and error respectively to the options object.  The load function simply passes the data where we call the subject’s OnNext with the data and then mark it as complete with the OnCompleted call.  The error function simply is given an error object so that we can see what went wrong and act accordingly.  Now what’s different between the two approaches is that we’re supporting a cancel option.  If we should call Dispose on our subscription before the request is complete, we want to have the ability to cancel our request.  To make this happen we’ll create a RefCountDisposable which allows us to cancel our request should we need to.  And finally we’ll return a CompositeDisposable which takes the Disposable from the AsyncSubject and from the cancel together so that if one ends before the other, then we can continue with the second.

observable.MooToolsJSONPRequest = function(options) {

    var subject = new root.AsyncSubject();
    var request = null;
    
    try {
        options.onSuccess = function(html) {
            subject.OnNext(html);
            subject.OnCompleted();
        };
    
        options.onFailure = function(xhr) {
            subject.OnError({ kind: "failure", xhr: xhr });
        };
    
        options.onException = function(headerName, value) {
            subject.OnError({ kind: "exception", headerName: headerName, value: value });
        };
        
        request = new Request.JSONP(options);
        request.send();
    }
    catch(err) {
        subject.OnError(err);
    }
    
    var refCount = new root.RefCountDisposable(root.Disposable.Create(function() {
        if(request) {
            request.cancel();
        }
    }));
    
    return observable.CreateWithDisposable(function(subscriber) {
        return new root.CompositeDisposable(subject.Subscribe(subscriber), refCount.GetDisposable());
    });    
}    

We can also extend the Request.JSONP class (and thanks for the assistance from Sebastian Markbåge) by using the implement function once again.  We’ll create a toObservable function to implement this same functionality.  There is a difference though that the options are private to the class.  So, we can either use the setOptions function to set the options or since the Request.JSONP class inherits from the Events class, we can call addEvents directly to add properties for success, failure and exception cases.

Request.JSONP.implement({
   
    toObservable: function () {
        var subject = new root.AsyncSubject();
        var request = this;
        try {
            
            request.addEvents({
    
                success: function(data) {
                    subject.OnNext(data);
                    subject.OnCompleted();
                },
    
                failure: function(xhr) {
                    subject.OnError({ kind: "failure", xhr: xhr });
                },
    
                exception: function(headerName, value) {
                    subject.OnError({ kind: "exception", headerName: headerName, value: value });
                }
    
            });
            
            request.send();
        }
        catch (err) {
            subject.OnError(err);
        }
    
        var refCount = new root.RefCountDisposable(root.Disposable.Create(function () {
            request.cancel();
        }));
    
        return observable.CreateWithDisposable(function (subscriber) {
            return new root.CompositeDisposable(subject.Subscribe(subscriber), refCount.GetDisposable());
        });
    }        
}); 

Once we’ve implemented this, we’re able to redo example where we query Twitter for those who are posting to FourSquare and has geolocation turned on and get the maximum user ID and then print their text.

window.addEvent("domready", function() {
    var options = {
        url: "http://search.twitter.com/search.json",
        data: { rpp : "100", q : "4sq.com" },
        callbackKey: "callback"
    };

    new Request.JSONP(options)
        .toObservable()
        .SelectMany(
            function(data) {
                return Rx.Observable.FromArray(data.results);
            })
        .Where(
            function(result) {
                return result.geo != null;
            })
        .Max(
            function(result) {
                return result.from_user_id;
            })
        .Subscribe(
            function(result) {
                alert(result.text);
            },
            function(error) {
                alert("An error!" + error.description);
            });
});

You can find the complete MooTools integration here and will be available in the next release of RxJS.

Instead of this example where we go from MooTools to RxJS, we could also extend a number of classes to create an API totally in MooTools with RxJS underneath the covers.  With this, we could implement behavior similar to that of Towel.js, an extension to MooTools for reactive programming.

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways. 

One important aspect of the Reactive Extensions for JavaScript is how well we play with other frameworks.  By giving out of the box extensions points, this allows for you to still use your framework of choice along with the Reactive Extensions to take care of the event-based and asynchronous behavior.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, Functional Programming, JavaScript, JSConf, Reactive Framework | Leave a comment