Sponsored By Aspose - File Format APIs for .NET

Aspose are the market leader of .NET APIs for file business formats – natively work with DOCX, XLSX, PPT, PDF, MSG, MPP, images formats and many more!

Actors in F# – The Bounded Buffer Problem

In the previous post, I covered an example of an auction simulation using asynchronous message passing and a shared nothing approach using the MailboxProcessor class in F#.  The auction example was a great piece to demonstrate scalability by adding additional clients to create a sort of bidding war between them.  Once again, with this approach, we’ve eliminated the need for locks and other concurrency primitives.

This time, let’s take another canonical example of a Bounded Buffer and look at some of the design patterns around this.

The Bounded Buffer

The goal of this post is to walk through an example of actor model concurrency of the canonical Bounded Buffer which is another example given in Scala.  The intent of this demo is to store and retrieve items in a buffer (rather simple actually).  Given this example, we’ll walk through how we might implement this using the constructs in F#.  One important aspect of this solution is to not post messages asynchronously as before, but instead, to post a message and await the reply.

Without further ado, let’s get into the code.  As before, we have a few utility functions that will be quite handy for this journey.  Much like we defined the (<—) operator last time for posting messages, I’d like one for posting and waiting for a reply.  In addition, I need a way to accomplish currying for reasons you will see later.  One thing that has irked me on occasion is the confusion between partial application and currying, which I’ve covered earlier.  Getting back to the issue at hand, let’s look at the code for that:

// Curry the arguments
let curry f x y = f (x, y)

// Asynchronous post
let (<--) (m:_ MailboxProcessor) msg = m.Post msg

// Post and reply operator
let (<->) (m:_ MailboxProcessor) msg = m.PostAndReply(fun replyChannel -> msg replyChannel)

As you’ll notice, I put the two operators, the asynchronous post and the post and reply operators, the former not being needed for this post.  The PostAndReply method gives a way to post a message and wait for the reply.  A temporary reply channel is created and that forms part of our message.  This reply channel is an AsyncReplyChannel<T> which supports one function of Reply which we will use later.  This message is then sent back to the caller as the result.

Next, we need to define the messages we will be processing as part of this bounded buffer.  Each of these messages define operations that our buffer supports, namely put, get and stop.  Let’s take a look at these in detail:

type 'a BufferMessage = Put of 'a * unit AsyncReplyChannel 
                      | Get of 'a AsyncReplyChannel 
                      | Stop of unit AsyncReplyChannel

As you will notice, each of these has an associated AsyncReplyChannel part to the defined message.  This is to allow me to reply to each of the callers in turn.  The Put and Stop both have reply channels that take no associated data, so we can create them as an AsyncReplyChannel<unit>.   The Put message allows us to put a value into the buffer, the Get allows us to retrieve those values in turn, and the Stop allows us to stop the mailbox. 

Let’s move on to the heart of the matter, the actual bounded buffer.  This class takes in a buffer size and then we expose methods that allow us to put values in the buffer, get values from the buffer and stop the mailbox.  Below is how the code might look:

type 'a BoundedBuffer(N:int) =
  
  let buffer =     
    MailboxProcessor.Start(fun inbox ->
      let buf:'a array = Array.zeroCreate N
      let rec loop in' out n =
        async { let! msg = inbox.Receive()
                match msg with
                | Put (x, replyChannel) when n < N ->
                    Array.set buf in' x
                    replyChannel.Reply ()
                    return! loop ((in' + 1) % N) out (n + 1)
                    
                | Get replyChannel when n > 0 ->
                    let r = Array.get buf out
                    replyChannel.Reply r
                    return! loop in' ((out + 1) % N) (n - 1)
                    
                | Stop replyChannel -> replyChannel.Reply(); return () }
      loop 0 0 0)
      
  member this.Put(x:'a) = buffer <-> curry Put x
  member this.Get() = buffer <-> Get
  member this.Stop() = buffer <-> Stop

Inside our BoundedBuffer class, we create the buffer which then creates an initialized array.  Because array contents are mutable, there is no sense in putting this as part of our processing loop.  Instead, we’ll focus on the input index, the output index and the number of items in the buffer as part of our processing loop.  When we receive the Put message when the number of items in the buffer is less than the buffer size, we set the value at the specified input index, return a reply back to the caller, and then loop with an increment to our index with a modulo of the buffer size as well as the number of items in the buffer.  In receiving a Get message when the number of items in the buffer is greater than zero, we get the item at the output index, send the reply back to the caller with the value, and then loop with an increment to the output index with a modulo as well as decrementing the number of items in our buffer.  Finally, should we receive a Stop, we simply reply back to the caller and return.

We created three methods to wrap this functionality for outside consumption.  The Put method takes in the item to post to the buffer, and then we simply do a PostAndReply with our Put message and our item to post.  I used currying here because the Put message requires two parameters, the item to put as well as the reply channel.  In this case, my operator already provides that reply channel, so I only need to supply the item to put.  Both the Get and the Stop methods are fairly straight forward as they post their respective messages with their private reply channels.

How does this work?  Let’s fire up F# interactive and take a look with an example of posting a few items to our buffer and then retrieving them.

> let buffer = new int BoundedBuffer 42;;

val buffer : int BoundedBuffer

> buffer.Put 12;;
val it : unit = ()
> buffer.Put 34;;
val it : unit = ()
> buffer.Put 56;;
val it : unit = ()
> buffer.Get();;
val it : int = 12
> buffer.Get();;
val it : int = 34
> buffer.Get();;
val it : int = 56
> buffer.Stop();;
val it : unit = ()

What I did was create a BoundedBuffer that handled integers with a buffer size of 42.  Then I posted three values, 12, 34 and 56.  After putting these values into our buffer, I then retrieved each in the order in which it was placed into our buffer.  Finally, I stopped the buffer.  The complete source code to this example can be found here.

Conclusion

Once again, we can create rather interesting solutions using this shared nothing asynchronous message passing approach in F#.  This solution involving the bounded buffer is no exception.  How might this solution look in Axum?  In due time, we will approach this as well as our Auction example from the previous post.  There are a lot of Axum items to cover especially in regards to asynchronous methods and ordered interaction points, so stay tuned.

This entry was posted in Axum, Concurrency, Erlang, F#, Functional Programming. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • Matt

    I’m having a difficult time convincing myself that this will work.

    In your discussion following the MailboxProcessor code, you mentioned what would happen if a send or receive arrived and there was no need to make the caller wait. What’s missing is any explanation of what happens when the buffer is full (in the case of send) or empty (in the case of receive) and the caller has to wait.

    Likewise, when I paste the code into Visual Studio, it warns that there is an incomplete pattern match on msg, which is another way of saying that the case of callers that need to wait goes unhandled, if I understand this correctly.

    It seems to me that the Scan method needs to be used instead of Receive.