Introduction to the Reactive Extensions for JavaScript – Buffering

We’ve come a long way in the series on the Reactive Extensions for JavaScript.  After spending some time with Ruby and with extension points in other libraries, let’s step back to some of the basic operators again.  This time, let’s talk about how we can buffer our input, which is to say we can put our observable values into a buffer based upon either time or by count.  This means that instead of flooding our system with calls to OnNext, we instead fill up a buffer based upon the given criteria of time or count, and then call OnNext with that value.  Let’s look deeper into how we can use this in today’s post.

Before we get started, let’s get caught up to where we are today:

 

Buffering Output

In a traditional pull-based model of synchronous programming, you could have scenarios where yielding the next value could be a potentially expensive blocking operations.  Contrasting it to the reactive push based model, we could get flooded with requests as our systems are eagerly sending us data as fast as it can.  Potentially this can be a problem which could overload our system.  To compensate for this, we have with the Reactive Extensions (for both .NET and JavaScript), a notion of buffering, which is to say based upon some criteria, we can buffer the results and return them as an array.  We have the choice of either buffering by count threshold, or by a given time span.  Once the criteria has been exceeded (or we reach the end), we have the values yielded to us at once in array form.  Let’s first look at the buffering by count.

…With Count?

The first buffering mechanism we have with the Reactive Extensions is by count.  To make use of this functionality, we can call the BufferWithCount method which takes a count, and optionally a number to indicate how many you want values to skip in between buffers being yielded.  This method then returns to us an array of values inside an observable sequence.

// count: the size of the buffer
// skip (Optional): how many values to skip
// returns: Observable<Array<T>>

Rx.Observable.prototype.BufferWithCount =  function(
    count, 
    skip);

Let’s look at this in action.  We’ll take an array with ten numbers and buffer them every two values, so in all we should have yielded to us five arrays.  We’ll iterate through our buffer and indicate its value and position in the buffer.

Rx.Observable.FromArray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .BufferWithCount(2)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

Below is the result of the BufferWithCount specifying only the count and not the skip count.  I have highlighted how many arrays you will get.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-3
Buffered: 1-4 // two
Buffered: 0-5
Buffered: 1-6 // three
Buffered: 0-7
Buffered: 1-8 // four
Buffered: 0-9
Buffered: 1-10 // five

Next, we’ll try specifying the skip count, which is to say how many elements we should skip.  For example, if I have a buffer set at two and then I specify four as the skip value, then it will yield 1 and 2 and then skip 3 and 4 before then yielding to us 5 and 6.  Let’s show how that code might look:

Rx.Observable.FromArray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .BufferWithCount(2, 4)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

And now we can sure enough see our results where we yield only three arrays this time due to us skipping 3, 4, 7 and 8.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-5
Buffered: 1-6 // two
Buffered: 0-9
Buffered: 1-10 // three

But this of course isn’t the only mechanism we have in our arsenal.  Let’s now talk about buffering with time.

…With Time?

In addition to buffering with count, we have a notion of buffering with time, which we can achieve with the BufferWithTime method.  This method takes in a timespan in milliseconds, an optional time shift, which acts like the skip from BufferWithCount, and an optional Rx Scheduler.  This then returns to us, as before, an Observable of an array.

// timeSpan: time span in milliseconds to buffer
// timeShift (Optional): time to skip listening in milliseconds
// scheduler (Optional): a custom scheduler
// returns: Observable<Array<T>>

Rx.Observable.prototype.BufferWithTime = function(
    timeSpan, 
    timeShift, 
    scheduler);

To show this in action, we’ll hop back to some of our knowledge gained in our custom schedulers post.  We’ll create our custom DelayedScheduler based upon a given delay time.  This will help take an existing array and then space it out over time.

var delayedScheduler = Rx.DelayedScheduler = function (delay) {

    return new Rx.Scheduler(
        function (action) {
            var id = window.setTimeout(action, delay);
            return Rx.Disposable.Create(function () {
                window.clearTimeout(id);
            });
        },
        function (action, dueTime) {
            var id = window.setTimeout(action, dueTime);
            return Rx.Disposable.Create(function () {
                window.clearTimeout(id);
            });
        },
        function () {
            return new Date().getTime();
        });
};

With the scheduler now in place, we can take our array and delay each item by 100 milliseconds by calling the FromArray with our scheduler.  We can then buffer it with a given timeout of 300 milliseconds and then subscribe much as we have before.

Rx.Observable.FromArray(
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 
        new Rx.DelayedScheduler(100))
    .BufferWithTime(300)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

This will yield to us some interesting results in that we have four arrays yielded to us, some with three elements and some with only two due to the timing.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-3
Buffered: 1-4 
Buffered: 2-5 // two
Buffered: 0-6
Buffered: 1-7
Buffered: 2-8 // three
Buffered: 0-9
Buffered: 1-10 // four

Now, let’s try with a time shift specified.  In this example, we’ll set our timeout once again as 300 milliseconds and our shift of 100 milliseconds, which should then allow us to skip the numbers 3 and 7 based upon our timing.

Rx.Observable.FromArray(
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 
        new Rx.DelayedScheduler(100))
    .BufferWithTime(300, 400)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

And sure enough that turns out to be the case as we have three arrays yielded to us, missing both the 3 and 7.

Buffered: 0-1
Buffered: 1-2 // one
Buffered: 0-4
Buffered: 1-5
Buffered: 2-6 // two
Buffered: 0-8
Buffered: 1-9
Buffered: 2-10 // three

Now we shouldn’t be limited to an either or at this point, why not have the best of both worlds?

…With Both?

As we’ve already covered both buffering with count and time, let’s cover how we can get the best of both worlds and when either the timeout or the buffer size has been hit, then yield us the values in the buffer.  We can do that through the use of the BufferWithTimeOurCount which we specify the time in milliseconds, a count, and an optional Rx Scheduler.  This then yields to us our Observable of an array.

// timeSpan: time used to determine buffer size
// count: count used to determine buffer size
// scheduler (Optional): optional scheduler
// returns: Observable<Array<T>>

Rx.Observable.prototype.BufferWithTimeOrCount = function(
    timeSpan, 
    count, 
    scheduler);

To show this in action, I’m going to need another custom scheduler, but this time completely random as to allow me to show off both behaviors.  We’ll take much of the above code from our DelayedScheduler and instead create a random number up to the delay parameter.  Then, every time a new value is to be yielded, then we delay that value by the given random number.

var randomScheduler = Rx.RandomScheduler = function (delay) {

    return new Rx.Scheduler(
    function (action) {
        var randomnumber = Math.random() * (delay + 1) << 0;
        var id = window.setTimeout(action, randomnumber);
        return Rx.Disposable.Create(function () {
            window.clearTimeout(id);
        });
    },
    function (action, dueTime) {
        var id = window.setTimeout(action, dueTime);
        return Rx.Disposable.Create(function () {
            window.clearTimeout(id);
        });
    },
    function () {
        return new Date().getTime();
    });
};

Let’s show this off by taking our standard array that we’ve used so far and use our customer RandomScheduler of a maximum of 100 milliseconds.  We’ll then buffer it by either a timeout of 200 milliseconds or 4 items and then subscribe like normally.

Rx.Observable.FromArray(
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        new Rx.RandomScheduler(100))
    .BufferWithTimeOrCount(200, 4)
    .Subscribe(
        function (next) {
            $.each(next, function (index, value) {
                $("<p/>").html("Buffered: " + index + "-" + value)
                    .appendTo(results);
            });
        });

We’ll see in our results that we’ll hit the timeout quite a few more times than we hit with the count, but it shows that we do indeed hit both the timeout and count buffers.

Buffered: 0-1
Buffered: 1-2 // timeout
Buffered: 0-3
Buffered: 1-4 // timeout
Buffered: 2-5
Buffered: 3-6 // count
Buffered: 0-7
Buffered: 1-8 // timeout
Buffered: 0-9
Buffered: 1-10 // timeout

And there you have it, a simple walkthrough of using buffers with the Reactive Extensions for JavaScript (and if you squint real hard, .NET as well).

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.

In the reactive programming world, we could easily have situations where we have systems pushing values at us faster than we would like.  To compensate for this, we have a notion of buffering in the Reactive Extensions which allow for us to specify either count buffers, timeout buffers, or heck, even both.  This gives us some much needed flexibility on how we process the data that is coming at us.

So with that, download it, and give the team feedback!

This entry was posted in Event-based Porgramming, JavaScript, JSConf, Reactive Framework. Bookmark the permalink. Follow any comments here with the RSS feed for this post.