Sponsored By Aspose - File Format APIs for .NET

Aspose are the market leader of .NET APIs for file business formats – natively work with DOCX, XLSX, PPT, PDF, MSG, MPP, images formats and many more!

Introduction to the Reactive Extensions for JavaScript – Buffering

We’ve come a long way in the series on the Reactive Extensions for JavaScript.  After spending some time with Ruby and with extension points in other libraries, let’s step back to some of the basic operators again.  This time, let’s talk about how we can buffer our input, which is to say we can put our observable values into a buffer based upon either time or by count.  This means that instead of flooding our system with calls to OnNext, we instead fill up a buffer based upon the given criteria of time or count, and then call OnNext with that value.  Let’s look deeper into how we can use this in today’s post.

Before we get started, let’s get caught up to where we are today:

 

Buffering Output

In a traditional pull-based model of synchronous programming, you could have scenarios where yielding the next value could be a potentially expensive blocking operations.  Contrasting it to the reactive push based model, we could get flooded with requests as our systems are eagerly sending us data as fast as it can.  Potentially this can be a problem which could overload our system.  To compensate for this, we have with the Reactive Extensions (for both .NET and JavaScript), a notion of buffering, which is to say based upon some criteria, we can buffer the results and return them as an array.  We have the choice of either buffering by count threshold, or by a given time span.  Once the criteria has been exceeded (or we reach the end), we have the values yielded to us at once in array form.  Let’s first look at the buffering by count.

…With Count?

The first buffering mechanism we have with the Reactive Extensions is by count.  To make use of this functionality, we can call the BufferWithCount method which takes a count, and optionally a number to indicate how many you want values to skip in between buffers being yielded.  This method then returns to us an array of values inside an observable sequence.

// count: the size of the buffer
// skip (Optional): how many values to skip
// returns: Observable<Array<T>>

Rx.Observable.prototype.BufferWithCount =  function(
    count, 
    skip);

Let’s look at this in action.  We’ll take an array with ten numbers and buffer them every two values, so in all we should have yielded to us five arrays.  We’ll iterate through our buffer and indicate its value and position in the buffer.

Rx.Observable.FromArray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .BufferWithCount(2)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

Below is the result of the BufferWithCount specifying only the count and not the skip count.  I have highlighted how many arrays you will get.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-3
Buffered: 1-4 // two
Buffered: 0-5
Buffered: 1-6 // three
Buffered: 0-7
Buffered: 1-8 // four
Buffered: 0-9
Buffered: 1-10 // five

Next, we’ll try specifying the skip count, which is to say how many elements we should skip.  For example, if I have a buffer set at two and then I specify four as the skip value, then it will yield 1 and 2 and then skip 3 and 4 before then yielding to us 5 and 6.  Let’s show how that code might look:

Rx.Observable.FromArray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .BufferWithCount(2, 4)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

And now we can sure enough see our results where we yield only three arrays this time due to us skipping 3, 4, 7 and 8.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-5
Buffered: 1-6 // two
Buffered: 0-9
Buffered: 1-10 // three

But this of course isn’t the only mechanism we have in our arsenal.  Let’s now talk about buffering with time.

…With Time?

In addition to buffering with count, we have a notion of buffering with time, which we can achieve with the BufferWithTime method.  This method takes in a timespan in milliseconds, an optional time shift, which acts like the skip from BufferWithCount, and an optional Rx Scheduler.  This then returns to us, as before, an Observable of an array.

// timeSpan: time span in milliseconds to buffer
// timeShift (Optional): time to skip listening in milliseconds
// scheduler (Optional): a custom scheduler
// returns: Observable<Array<T>>

Rx.Observable.prototype.BufferWithTime = function(
    timeSpan, 
    timeShift, 
    scheduler);

To show this in action, we’ll hop back to some of our knowledge gained in our custom schedulers post.  We’ll create our custom DelayedScheduler based upon a given delay time.  This will help take an existing array and then space it out over time.

var delayedScheduler = Rx.DelayedScheduler = function (delay) {

    return new Rx.Scheduler(
        function (action) {
            var id = window.setTimeout(action, delay);
            return Rx.Disposable.Create(function () {
                window.clearTimeout(id);
            });
        },
        function (action, dueTime) {
            var id = window.setTimeout(action, dueTime);
            return Rx.Disposable.Create(function () {
                window.clearTimeout(id);
            });
        },
        function () {
            return new Date().getTime();
        });
};

With the scheduler now in place, we can take our array and delay each item by 100 milliseconds by calling the FromArray with our scheduler.  We can then buffer it with a given timeout of 300 milliseconds and then subscribe much as we have before.

Rx.Observable.FromArray(
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 
        new Rx.DelayedScheduler(100))
    .BufferWithTime(300)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

This will yield to us some interesting results in that we have four arrays yielded to us, some with three elements and some with only two due to the timing.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-3
Buffered: 1-4 
Buffered: 2-5 // two
Buffered: 0-6
Buffered: 1-7
Buffered: 2-8 // three
Buffered: 0-9
Buffered: 1-10 // four

Now, let’s try with a time shift specified.  In this example, we’ll set our timeout once again as 300 milliseconds and our shift of 100 milliseconds, which should then allow us to skip the numbers 3 and 7 based upon our timing.

Rx.Observable.FromArray(
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 
        new Rx.DelayedScheduler(100))
    .BufferWithTime(300, 400)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

And sure enough that turns out to be the case as we have three arrays yielded to us, missing both the 3 and 7.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-4
Buffered: 1-5
Buffered: 2-6 // two
Buffered: 0-8
Buffered: 1-9
Buffered: 2-10 // three

Now we shouldn’t be limited to an either or at this point, why not have the best of both worlds?

…With Both?

As we’ve already covered both buffering with count and time, let’s cover how we can get the best of both worlds and when either the timeout or the buffer size has been hit, then yield us the values in the buffer.  We can do that through the use of the BufferWithTimeOurCount which we specify the time in milliseconds, a count, and an optional Rx Scheduler.  This then yields to us our Observable of an array.

// timeSpan: time used to determine buffer size
// count: count used to determine buffer size
// scheduler (Optional): optional scheduler
// returns: Observable<Array<T>>

Rx.Observable.prototype.BufferWithTimeOrCount = function(
    timeSpan, 
    count, 
    scheduler);

To show this in action, I’m going to need another custom scheduler, but this time completely random as to allow me to show off both behaviors.  We’ll take much of the above code from our DelayedScheduler and instead create a random number up to the delay parameter.  Then, every time a new value is to be yielded, then we delay that value by the given random number.

var randomScheduler = Rx.RandomScheduler = function (delay) {

    return new Rx.Scheduler(
    function (action) {
        var randomnumber = Math.random() * (delay + 1) << 0;
        var id = window.setTimeout(action, randomnumber);
        return Rx.Disposable.Create(function () {
            window.clearTimeout(id);
        });
    },
    function (action, dueTime) {
        var id = window.setTimeout(action, dueTime);
        return Rx.Disposable.Create(function () {
            window.clearTimeout(id);
        });
    },
    function () {
        return new Date().getTime();
    });
};

Let’s show this off by taking our standard array that we’ve used so far and use our customer RandomScheduler of a maximum of 100 milliseconds.  We’ll then buffer it by either a timeout of 200 milliseconds or 4 items and then subscribe like normally.

Rx.Observable.FromArray(
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        new Rx.RandomScheduler(100))
    .BufferWithTimeOrCount(200, 4)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

We’ll see in our results that we’ll hit the timeout quite a few more times than we hit with the count, but it shows that we do indeed hit both the timeout and count buffers.

Buffered: 0-1
Buffered: 1-2 // timeout
Buffered: 0-3
Buffered: 1-4 // timeout
Buffered: 2-5
Buffered: 3-6 // count
Buffered: 0-7
Buffered: 1-8 // timeout
Buffered: 0-9
Buffered: 1-10 // timeout

And there you have it, a simple walkthrough of using buffers with the Reactive Extensions for JavaScript (and if you squint real hard, .NET as well).

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.

In the reactive programming world, we could easily have situations where we have systems pushing values at us faster than we would like.  To compensate for this, we have a notion of buffering in the Reactive Extensions which allow for us to specify either count buffers, timeout buffers, or heck, even both.  This gives us some much needed flexibility on how we process the data that is coming at us.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, JavaScript, JSConf, Reactive Framework | Leave a comment

Dojo Deferred and the Reactive Extensions for JavaScript

We’ve covered quite a bit in this series including how well the Reactive Extensions for JavaScript plays with other libraries and what integration points we have.  Our main thrust in providing this is that we don’t want to replace your library of choice, whether it be jQuery, Dojo, MooTools, YUI in addition to the server components of node.js.  Instead, we want to build bridges from them when you encounter the pains of asynchronous and event-based composition which frequently crop up in JavaScript applications both on the web and even on the server.  Case in point, let’s go over our latest integration with the Dojo Toolkit’s promises via the dojo.Deferred module.

Before we get started, let’s get caught up to where we are today:

 

Dojo Deferred

The Dojo dojo.Deferred module is a central piece of the Dojo Toolkit which serves as the basis for such things as AJAX calls like xhrGet among others.  At its core, it serves as a promises API which acts as a proxy for value which has not yet been calculated, and through this API, we have a clear separation of concerns which allows us to abstract the asynchronous behavior via callbacks into the promise itself, instead of the method signature.  The same could be said of the Reactive Extensions for JavaScript as well. 

Although dojo.Deferred has been around for a while, the newest release of the Dojo Toolkit has added another method to handle both the success and failure cases of a given computation, such as the following code.  Imagine we had an asynchronous function which returns to us a dojo.Deferred object, which does nothing more than delay by a second and return the value.

function someDeferredFunction() {
    var deferred = new dojo.Deferred();
    setTimeout(
        function() {
            deferred.callback(42); 
        }, 1000);
    return deferred;    
}

Then we can subscribe to both its success and failure cases by using the then function.

var deferred1 = someDeferredFunction();
deferred1.then(
    // Success
    function(value) {
        alert(value);
    },
    // Failure
    function(error) {
        alert(error);
    });

We can see a bit of overlap between the ideas here and the heart of the Reactive Extensions for JavaScript.  The difference between the two is that this API treats asynchronous calls as one shot resumable methods whereas RxJS treats these as observable sequences.  So, how could we take advantage of this API and build a bridge between the two libraries?

From Deferred to Observable

The first order of business is to see how we can take a given dojo.Deferred and turn it into an Observable sequence yielding a single value.   Using the Reactive Extensions for JavaScript with the Dojo support added via the “rx.dojo.js” script, we can seamless leap between the two by calling the asObservable prototype method we added.

var deferred2 = someDeferredFunction();
deferred2.asObservable()
    .Select(function(value) { return value * value; })
    .Subscribe(
        function(value) {
            alert(value);
        });

In this example, we took the someDeferredFunction from above and then called the asObservable method, then we’re free to do such things as projections via Select in which we square the returned value and then subscribe to the result via Subscribe.  To show you how we did it, let’s walk through the code below.  First, we create an AsyncSubject which represents and asynchronous operation which accepts only one value and then caches it for all future observations.  Then we can call the then function and in the success case, we call OnNext with the result and OnCompleted to indicate we are finished, and in the failure case, we send the error on via OnError.  FInally we return the subject, thus completing our bridge.

dojo.Deferred.prototype.asObservable = function () {
    var subject = new Rx.AsyncSubject();
    
    this.then(
        // Success
        function (value) {
            subject.OnNext(value);
            subject.OnCompleted();
        },
        // Failure
        function (error) {
            subject.OnError(error);
        });
        
    return subject;
};

But, what about the other way around?  Can we go from an Observable sequence to a dojo.Deferred?

From Observable to Deferred

Because we feel like building bridges, we want to give the ability for traffic to go both ways.  For example, we could have a given AsyncSubject that we want to use with our existing Dojo functionality.  Imagine we had an Observable sequence that waits for a second and then yields the value via an AsyncSubject.

function someAsyncSubjectFunction() {
    var subject = new Rx.AsyncSubject();
    
    setTimeout(
        function() {
            subject.OnNext(42);
            subject.OnCompleted();
        },
        1000);
        
    return subject;
}

Now, we might have some processing logic written in Dojo, so we want you to take full advantage of that.  Via the AsDeferred method provided to us through the “rx.dojo.js” script, we build that bridge between the two libraries.  After calling AsDeferred, we can then use it as a regular dojo.Deferred object.  So, now we can call then passing in the two continuations for the success and failure cases.

var subject = someAsyncSubjectFunction();
subject
    .AsDeferred()
    .then(
        function (value) {
            alert(value);
        },
        function (error) {
            alert(error);
        });

Now, how did we make this happen?  We’ll need to extend the AsyncSubject with our AsDeferred method.  Inside, we’ll create a new dojo.Deferred which will handle the values from our AsyncSubject.  Next, we’ll subscribe to our subject via the Subscribe method, calling the callback method with our result on success, and errback on the failure.  Finally, we return the deferred which completes our bridge.

Rx.AsyncSubject.prototype.AsDeferred = function () {
   var deferred = new dojo.Deferred();
   
    this.Subscribe(
        function (value) {
            deferred.callback(value);
        },
        function (error) {
            deferred.errback(error);
        });
        
    return deferred;
}

And there you have it, our bridge between the Dojo Toolkit and the Reactive Extensions for JavaScript, which allows you to have the best of both worlds.

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.

The Reactive Extensions isn’t meant to be the only solution in your toolkit, but instead you can continue to use the JavaScript library of your choice, and when you run into callback hell and composition issues, that’s where RxJS shines.  To that end, we’ve provided a lot of bridges between various libraries such as the following, and more to come:

So with that, download it, and give the team feedback!

Posted in JavaScript, JSConf, Reactive Framework | Leave a comment

IronRuby and the Reactive Extensions Together Again – Taming User Input

In the previous post, I talked about how IronRuby 1.1 now supports extension methods, and that it not only supports LINQ to objects, but with relative ease it also supports the Reactive Extensions for .NET.  We covered a little example of taking a Ruby Enumerable and turn it into an .NET Observable instance and then projecting and filtering the data.  In this post, we’ll go a little bit further by showing how you can also use events as well and how we can tame user input.

What Does the Future Hold?

Before we continue this series, I thought I’d address the elephant in the room in terms of what the future holds for IronRuby.  Jimmy Schementi, the former PM of the IronRuby project laid out what has been going on in his blog and the IronRuby core mailing list (which if you want to see it stick around, join the community).  Keep in mind that at the time of this writing, no decision has been made one way or the other as to its future.  My hope is that the community does indeed stand up and either continue on IronRuby, or a bridge effort such as RubyCLR as they are both valuable things to have.

Either way, this blog series, however many posts it may be, will continue as I think it might have some value to some folks out there.  So with that, onto today’s subject.

Taming User Input

In this post, I’m going to take an example we had from the Reactive Extensions for .NET Hands on Labs and apply it to a version using IronRuby and WPF.  The idea behind this example is that calls to such things as HTTP calls such as Web Services, REST and so forth can be expensive.  Imagine if you were a really fast typer at over 100 words per minute, and then having each new character cause a new request to be sent.  Chances are you’d overload the system, and not only that, but also have a chance of the results coming back in out of order.  So, how do we fix that?  By using the Reactive Extensions for .NET together with IronRuby, we can make this scenario work quite nicely.

Removing the Boilerplate

Getting started, we’ll need to get some boilerplate out of the way including referencing the proper assemblies for a .NET 4 solution.  In order for us to do so, let’s move the assembly includes to a separate file.  Since we’re going to be using WPF and the Reactive Extensions, we’ll create two separate files, rxnet.rb and wpf.rb.  The rxnet.rb contains all specific assemblies related to the Reactive Extensions such as the following:

load_assembly 'System.CoreEx'
load_assembly 'System.Interactive'
load_assembly 'System.Reactive'

And our wpf.rb contains the following assemblies:

load_assembly 'WindowsBase'
load_assembly 'PresentationCore'
load_assembly 'PresentationFramework'
load_assembly 'System.Xaml'
load_assembly 'System.CoreEx'
load_assembly 'System.Reactive'

Now we can make this available any number of ways, either through a quick gem, or just copying these files directly to the lib folder for IronRuby.  Once we’ve decided on a given path, then we could reference all we need by the following for this post:

require 'System.Core'
require 'wpf'
require 'rxnet'

Now that our boilerplate is out of the way, let’s get on to the solution.

Starting the Solution

Once we have finished that, we are free to include the namespaces and the extension method namespaces which include both System and System::Linq namespaces.  The IObservable extensions for the standard operators reside in System::Linq, and the overloads for the IObservable<T>.subscribe reside in the System namespace.

include System
include System::Linq
include System::Windows
include System::Windows::Controls

using_clr_extensions System
using_clr_extensions System::Linq

Now, let’s create a standard WPF Window class with a StackPanel which contains two elements, our TextBox for entering data and a ListBox for showing the results. 

class RubyWindow < Window

    def initialize
        self.Title = 'Hello Ruby!'
        
        stackpanel = StackPanel.new
        @textbox = TextBox.new
        @listbox = ListBox.new
        stackpanel.children.add(@textbox)
        stackpanel.children.add(@listbox)
        self.content = stackpanel
        
        initialize_handlers
    end

end

Once we’ve done this, now let’s get to the interesting part.  After setting the content of our Window to the StackPanel, let’s hook up the to the TextChanged event of our TextBox by calling Observable.from_event generic method (FromEvent) with the arguments of TextChangedEventArgs. 

def initialize_handlers

    @textbox_changed = Observable.
        method(:from_event).
        of(TextChangedEventArgs).
        call(@textbox, 'TextChanged')

end

This is all fine and interesting, but the TextChangedEventArgs doesn’t actually give us the text of our TextBox, so, let’s fix that to call select, passing in our IEvent<TextChangedEventArgs> argument and then retrieving the text property from the sender, which in this case was the TextBox.  In order for us to call generic methods using IronRuby, we need to make use of the following pattern in which we specify the method we want to call, of what type and then we can invoke it via the call method.

some_object.
    method(:some_method).
    of(SomeType).
    call(some_args)

To see this in action, we’ll apply the same pattern to call the from_event method of type TextChangedEventArgs, and then we invoke via call with our @textbox and the ‘TextChanged’ event.

def initialize_handlers

    @textbox_changed = Observable.
        method(:from_event).
        of(TextChangedEventArgs).
        call(@textbox, 'TextChanged').
        select(lambda { |event| event.sender.text })        

end

One distinct advantage we have here while using a dynamic language such as Ruby, we’re able to get the text from the sender without having to cast.  For example, using C#, we’d have to do the following:

var textChanged = 
    Observable.FromEvent<TextChangedEventArgs>(textbox, "TextChanged")
        .Select(ev => ((TextBox)sender).Text);

We can subscribe to the resulting observable and then have the items added to the ListBox.

def initialize_handlers

    @textbox_changed = Observable.
        method(:from_event).
        of(TextChangedEventArgs).
        call(@textbox, 'TextChanged').
        select(lambda { |event| event.sender.text })        

    @textbox_changed.
        subscribe(lambda {|text| @listbox.items.add(text))

end

This is great, but as you may notice, all of our input is immediately put into the below ListBox.  This may or may not be a problem, but imagine we’re calling an external service, then ultimately it’d be a huge issue to have hundreds of calls going out at once because we’re fast typers.  Let’s fix that by using the throttle method (Throttle) which allows us to specify a timeout time before the event is fired.  If there are no other events that happen in the specified timeframe, then we get the result, else we keep waiting until that timeout.  So, let’s now throttle our input.

def initialize_handlers

    @textbox_changed = Observable.
        method(:from_event).
        of(TextChangedEventArgs).
        call(@textbox, 'TextChanged').
        select(lambda { |event| event.sender.text })        

    @textbox_changed.
        throttle(TimeSpan.from_milliseconds(500))
        subscribe(lambda {|text| @listbox.items.add(text))

end

Now we can run it and watch the results, and it should just work, right?  Not so much…

image

The problem is that once we start throttling our input, we’re launching a new thread to make this happen.  This newly spawned thread cannot access the object to modify the items.  Instead, we have to think about Schedulers, which I’ll cover all of them in a later post, but sufficed to say, we have some options.  The easiest of which is the observe_on_dispatcher method (ObserveOnDispatcher) which allows us to observe this sequence on the control’s dispatcher.  Let’s change the code to fix this issue:

def initialize_handlers

    @textbox_changed = Observable.
        method(:from_event).
        of(TextChangedEventArgs).
        call(@textbox, 'TextChanged').
        select(lambda { |event| event.sender.text })        

    @textbox_changed.
        throttle(TimeSpan.from_milliseconds(500)).
        observe_on_dispatcher.
        subscribe(lambda {|text| @listbox.items.add(text))

end

And now we can start typing, and sure enough our text is throttled, including copying the sentence and pasting it.

image

But you’ll notice that we now have a duplicate because of the copy/paste operation.  What we really want is to have distinct values only in our ListBox.  To make that happen, we’ll need to use the distinct_until_changed method (DistinctUntilChanged) which then eliminates those duplicates.

def initialize_handlers

    @textbox_changed = Observable.
        method(:from_event).
        of(TextChangedEventArgs).
        call(@textbox, 'TextChanged').
        select(lambda { |event| event.sender.text })        

    @textbox_changed.
        throttle(TimeSpan.from_milliseconds(500)).
        observe_on_dispatcher.
        distinct_until_changed.
        subscribe(lambda {|text| @listbox.items.add(text))

end

And we can run the same experiment and sure enough our result is throttled and we receive no duplicates from item to item.

image

And there you have it, a simple case of taming user input, all with the use of IronRuby and the Reactive Extensions for .NET.

Conclusion

With the Reactive Extensions for .NET, we have a nice cross-language support for building a bridge to asynchronous and event-based programming.  Coming with the latest release of IronRuby, we now have the ability, and a rather nice one at that, to consume extension methods that are predefined in other languages, which opens a whole new set of opportunities for interoperability.  In the coming posts, I’ll take a look at what else I can do in this space to see that we can fact help solve some of the hardest problems around asynchronous and event-based programming.

As for the future of IronRuby, best to get involved and help where you can if you want to see it be a success!

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, Reactive Framework, Ruby | 2 Comments

IronRuby and the Reactive Extensions for .NET Together at Last

Recently, there was a release of IronRuby 1.1, which had a number of new features including targeting .NET 4 only, as well as the most interesting part, the support for Extension Methods.  Taking a cue from the examples, we can write a simple LINQ comprehension using extension methods defined in the System.Core library’s System.Linq namespace.  To make this happen, we need to load our assembly System.Core and then use the extension methods defined in the System.Linq namespace.

load_assembly 'System.Core'
using_clr_extensions System::Linq

We’ll also need a way to go from the Ruby Enumerable to .NET IEnumerable<T> instances.  To bridge that, we’ll extend the Object class and define a to_seq method, which takes a type, should we need to make our IEnumerable<T> instance anything other than a Object.  We then create the IEnumerable<T> instance by calling to_a or to array on the Enumerable itself.

class Object  
    def to_seq(type = Object)    
        System::Linq::Enumerable.method(:of_type).of(type).call(self.to_a)  
    end
end

Putting this all together, I take a Ruby enumerable and convert to a .NET IEnumerable<T> instance, square the numbers and get those which are divisible by 3 and then print the results.

load_assembly 'System.Core'
using_clr_extensions System::Linq

class Object  
    def to_seq(type = Object)    
        System::Linq::Enumerable.method(:of_type).of(type).call(self.to_a)  
    end
end

(1..10).to_seq.
    select(lambda { |x| x * x }).
    where(lambda { |x| x % 3 == 0 }).
    each { |x| puts x }

This works great as it prints out the numbers 9, 36 and 81.  This is a great interop story although I’m sure if I’m using Ruby, I might just stick with the Enumerable module.  But then I looked at the Observable Module in Ruby and I wasn’t really happy with what I saw there in terms of adding/deleting observers as well as the notification is pretty much a straight copy of the Gang of Four pattern, which I think we’ve moved past. 

This got me thinking, that if we can go back and forth with Ruby’s Enumerable module to the .NET IEnumerable<T> interface, then we should be able to do the same with the Reactive Extensions for .NET and the IObservable<T> interface.  We’ll recreate the example above, but instead of the push model of Enumerable, we’ll be pushing objects to it instead.

Let’s first load our assemblies that we’ll need and include the proper namespaces and extension method namespaces.  We’re including the System namespace as that contains the overloads for the Observable.Subscribe method to take lambdas instead of an IObserver<T> instance.

load_assembly 'System.Core'
load_assembly 'System.CoreEx'
load_assembly 'System.Reactive'

include System
include System::Linq

using_clr_extensions System
using_clr_extensions System::Linq

Once we’ve done that, let’s take the knowledge about taking an Enumerable and turning it into an IEnumerable<T> by taking that same Enumerable and turn it into the IObservable<T> via the to_observable (ToObservable) method.

class Object  
    def to_seq(type = Object)    
        System::Linq::Enumerable.method(:of_type).of(type).call(self.to_a)  
    end
    
    def to_observable(type = Object)
        System::Linq::Observable.method(:to_observable).of(type).call(self.to_a)
    end
end

Now we can focus on the problem at hand which is to create our IObservable<T> instance.  As above, we can take an Enumerable range of 1 to 10 and turn that into an observerable.  Once we do that, we can now call our select and where methods appropriately which creates our observable sequence.

observable = (1..10).to_observable.
    select(lambda { |x| x * x }).
    where(lambda { |x| x % 3 == 0 })

We now have our observable sequence, but now what?  Well, since we’ve imported the extension methods from the System namespace, we’re able to use the Subscribe overloads that take lambdas which cover the OnNext, OnError and OnCompleted cases.  In this case, I’ll handle all three, but in this instance I only really need to handle just the OnNext.  I’ll use the do syntax instead of the lambda just to show you that both work.

observable.subscribe(
    lambda do |next_value|
        puts next_value
    end,
    lambda do |error|
        puts '#{error.message}'
    end,
    lambda do 
        puts 'done!'
    end)

Having this code in place, I can run this and sure enough I have printed to my screen:

9
36
81
done!

Interesting possibilities here, including comparing and contrasting the Ruby Observable and the IObservable<T>.  More to come in the next post!

Conclusion

With the Reactive Extensions for .NET, we have a nice cross-language support for building a bridge to asynchronous and event-based programming.  Coming with the latest release of IronRuby, we now have the ability, and a rather nice one at that, to consume extension methods that are predefined in other languages, which opens a whole new set of opportunities for interoperability.  In the coming posts, I’ll take a look at what else I can do in this space to see that we can fact help solve some of the hardest problems around asynchronous and event-based programming.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, Reactive Framework, Ruby | 5 Comments

Introduction to the Reactive Extensions for JavaScript – Error Handling Part II

We’ve covered a bit recently with conditional and looping operators on the Reactive Extensions for JavaScript, but I want to step back just a minute and cover exception handling.  This post will cover how we can compensate for errors as they happen in several ways and will largely follow Bart de Smet’s post on the same topic, but instead of covering the Interactive Extensions, we’ll stick primarily in JavaScript.  Last time, we covered the basics of error handling including Catch and Finally.

Before we get started, let’s get caught up to where we are today:

OnErrorResumeNext VB Style

The next error handling feature in the Reactive Extensions for JavaScript is one that should be familiar with those in the Visual Basic world, called OnErrorResumeNext (On Error Resume Next).   In Visual Basic parlance, this meant you specify that when a run-time error occurs, control goes to the statement immediately following the statement where the error occurred, and execution continues from that point.  The functionality is similar to the Concat operator which allows you append Observable sequences onto each other to create a single sequence, except that should an error occur, it is not bubbled up and instead continues to the next.  This operator comes in two flavors, one as a static method and one as an instance method:

// items : Observable[]
// scheduler : optional custom scheduler
// returns : Observable

Rx.Observable.prototype.OnErrorResumeNext(
    items,
    scheduler);

// items : Observable[]
// returns : Observable
Rx.Observable.OnErrorResumeNext(
    items);

Let’s create a simple example where we want to output the numbers 1 through 9, even if an exception occurs along the way.  In this case, we yield two arrays and the third then has an exception concatenated to the end before then yielding the final array.

Rx.Observable.OnErrorResumeNext([
    Rx.Observable.FromArray([1, 2]),
    Rx.Observable.FromArray([3, 4, 5]),
    Rx.Observable.FromArray([6, 7]).Concat(Rx.Observable.Throw("woops!")),
    Rx.Observable.FromArray([8, 9])])
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

This is great for handling data that may be potentially bad and is ok to drop the exception should it occur.

Cleaning Up With Using

Just as in .NET there is the using keyword to denote a try/finally with a call to Dispose, we need a similar capability in the JavaScript world for resource cleanup in the asynchronous world.  Let’s look at the signature of the function which takes a resource selector which returns the Disposable object, and then the usage which takes the Disposable object and you return an Observable sequence.

// resourceSelector : () -> Disposable,
// resourceUsage : Disposable -> Observable
// returns : Observable

Rx.Observable.Using = function(
    resourceSelector, 
    resourceUsage);

A Disposable object is nothing more than an object with a Dispose method which takes no arguments and returns nothing.  For example, we could create a Disposable object to encompass the jQuery data API which allows us to store arbitrary data associated with a given element.  This object would allow us to both get and set the value, and in our Dispose, we would remove the data.

DataDisposable = function (element, key) {

    this.data = function (value) {
        return element.data(key, value);
    };

    // Dispose method
    this.Dispose = function () {
        element.removeData(key);
    };
};

Using this, we can write two samples, the first of which sets the data and then retrieves the data without an exceptional condition whereas the second example does.  let’s take a look at the first example which yields to us the answer of 56088.

Rx.Observable.Using(

    // resourceSelector
    function () {
        return new DataDisposable($("#content"), "key1");
    },
    
    // resourceUsage
    function (disposable) {
        disposable.data(123);
        
        return Rx.Observable.Return(456 * disposable.data());
    })
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#content");
        });

The second example below is the exact same, except that we throw an exception and then resume the next sequence which returns 789 instead of the answer above.

Rx.Observable.Using(

    // resourceSelector
    function () {
        return new DataDisposable($("#content"), "key1");
    },
    
    // resourceUsage
    function (disposable) {
        disposable.data(123);

        return Rx.Observable.Return(456 * disposable.data())
            .Concat(Rx.Observable.Throw("woops"));
    })
    .Catch(Rx.Observable.Return(789))
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#content");
        });

In both cases here, we ensure that the underlying data is properly disposed when we leave the scope of our Using operator.

If At First You Fail, Retry

Let’s move onto the last operator for this section on error handling, Retry.  The idea behind this operator is that we can retry the observable sequence until it succeeds (meaning no exception) or hits the maximum try count.  Let’s look at the method signatures below:

// returns : Observable

Rx.Observable.prototype.Retry();

// count : int
// scheduler : optional scheduler
// returns : Observable

Rx.Observable.prototype.Retry(
    count,
    scheduler);

Now if we have a sequence that doesn’t throw an exception, then it should be a standard no-op, which means it does nothing for example, the following simply prints the numbers 1 through 3.

Rx.Observable.FromArray([1, 2, 3])
    .Retry()
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

However, if we indeed had a collection that did throw an exception each and every time, we could run into a big issue such as the following, where it would blow out the JavaScript stack as it would try to go on forever:

Rx.Observable.FromArray([1, 2, 3])
    .Concat(Rx.Observable.Throw("woah nellie!"))
    .Retry()
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

Luckily, we have ways around this where we can specify the maximum number of retries that are allowed with the provided overload.  Next, let’s show a case where Retry is really used.  For example, let’s say we have a loop which could produce three values, but will fail at different points for the first two iterations. 

var count = 0;

Rx.Observable.Return(4)
    .Concat(
        Rx.Observable.If(
            function() { return count == 0; },
            Rx.Observable.Throw("nope!"),
            Rx.Observable.Return(5)))
    .Concat(
        Rx.Observable.If(
            function () { return count == 1; },
            Rx.Observable.Throw("nope!"),
            Rx.Observable.Return(6)))
    .Finally(function() { count++; })
    .Retry()
    .Subscribe(
        function (next) {
            $("<p/>").html(next).appendTo("#results");
        });

In this situation, we will render the following result, since we hit two different exceptions:

4 // Hits exception
4
5 // Hits exception
4
5
6

And there you have it, we have composable ways around potential failures in our asynchronous push based operations that give us the flexibility that our synchronous imperative ones do.

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.

When we start creating more advanced workflows through the Reactive Extensions, we also need ways of handling errors as well.  We have the ability do handle errors and compensate as they happen through a rich set of operators including Catch, Finally, Using, OnErrorResumeNext, Retry and more.

So with that, download it, and give the team feedback!

Posted in Event-based Porgramming, JavaScript, JSConf, Reactive Framework | Leave a comment