F# First Class Events – Async Workflows + Events Part III

So far in this series, I’ve covered a bit about what first class events are in F# and how you might use them.  In the first post, we looked at what a first class events mean and some basic combinators in order to compose events together.  In the second post, we looked at how we might create events and publish them to the world through classes.  And in the third post I talked about how to manage the lifetime of a subscription.  In the fourth installment, I corrected my usage of the old create function and instead to use the Event class to create, trigger and publish events.  In the last part, we’ve been talking about asynchronous workflows and eventing together, and this time we’ll pick up on that discussion.  Before we get started, let’s get caught up to where we are today.

Converting Events to Asynchronous Operations

One of the last topics to be covered in this section involves the question, “How do we take an event and properly manage the cancellation and error checking?”  Earlier I showed about how we could take the WebClient’s DownloadStringAsync event and partition it into three different events based upon the outcome of the main event.  This time, we’re going to take that event and turn it into an asynchronous operation with proper error checking and cancellation checking.

In order to make this happen, we need to use the AwaitEvent extension method on the Async<T> class.  Let’s take a first glance at how to do this with a BackgroundWorker class.  This class allows us to do potentially expensive operation in the background and report progress along the way.  Of interest to us is the RunWorkerCompleted event and the RunWorkerAsync method in trying to wrap that in such a way that we can check for errors and cancellations.  First, let’s create an extension method to the BackgroundWorker class to allow for the Async<T> binding.

open System
open System.ComponentModel

type BackgroundWorker with
    member this.AsyncRunWorker(?argument:obj) =
    async { // Run the worker asynchronously
            let arg = defaultArg argument null
            this.RunWorkerAsync(arg)
              
            // Wait for the event to happen
            let! args = Async.AwaitEvent(this.RunWorkerCompleted, 
                                         cancelAction=this.CancelAsync)
              
            // Base our result on what happened
            let result = 
              if args.Cancelled then
                AsyncCanceled (new OperationCanceledException())
              elif args.Error <> null then AsyncException args.Error
              else  AsyncOk args.Result 
            return! AsyncResult.Commit(result) } 

What I’ve done is to create an extension method called AsyncRunWorker which has an optional argument to send to the worker.  I first call the RunWorkerAsync with our argument (null if omitted), then I wait for the RunWorkerCompleted event to fire with our cancel action.  This cancel action allows us to specify some behavior that happens should the operation somehow be canceled.  Next, we base our result upon whether we have a cancellation, an error or lastly, we have a result.  Then we commit that answer as our result.  This function, when called, will produce and Async<obj> as that is the signature of the args.Result.

We can test the behavior of this by creating a simple worker and running it inside of an async workflow.  In order to do so, we’ll put in a little bit of infrastructure that we had in a previous post talking about subscribing to events.  This way, we can unsubscribe from our event automatically at the end of our async workflow through the use of a using scope. 

[<AutoOpen>]
module EventExtensions = 
  open System

  type IDelegateEvent<'Del when 'Del :> Delegate > with
    member this.Subscribe(d) =
      this.AddHandler(d)
      { new IDisposable with
          member disp.Dispose() =
            this.RemoveHandler(d) }

After we’ve defined this, let’s go ahead and now define our async workflow with the BackgroundWorker defined.  In this instance, we’ll create a simple background process that waits for a little bit and then formats the argument into a string as the result.

let workerResult (worker:BackgroundWorker) =
  async { use e = worker.DoWork.Subscribe(fun _ args -> 
                                  Thread.Sleep(5000)
                                  args.Result <- sprintf "Hello %A" args.Argument)
          return! worker.AsyncRunWorker ("Matt") }
  |> Async.RunSynchronously

In this instance, we created a function which takes a BackgroundWorker and inside the async workflow, we subscribe to the DoWork event to sleep for a bit, and then format a string with the argument.  At the end of the async block, I invoke the AsyncRunWorker extension method with my first name as an argument.  Finally, I run it synchronously just to force the evaluation.  Had I been using .NET 4.0, I could have used a Task<T> instead and invoked it as a future.  I’ll cover how that works in a future post. 

Now that we have our function defined for handling a BackgroundWorker inside the async workflow, let’s tie it all together.  First, we’ll create the worker which supports cancellation.  One item of note is that I included below some pragmas for fixing some thread blocking issues when using F# interactive around the use of SynchronizatonContexts and the Async.RunSynchronously.  If this code is compiled and run as an executable, then it won’t be affected.  I invoke the worker three times and print out the result of each.

let worker = new BackgroundWorker(WorkerSupportsCancellation = true)

#if INTERACTIVE
let context = SynchronizationContext.Current
SynchronizationContext.SetSynchronizationContext(null)
#endif

workerResult worker |> printfn "%A"
workerResult worker |> printfn "%A"
workerResult worker |> printfn "%A"

#if INTERACTIVE
SynchronizationContext.SetSynchronizationContext(context)
#endif
do ()

It’s an interesting exercise, but how about something a bit more useful?  

Extending WebClient

In this case, let’s look at extending the System.Net.WebClient to also support F# async workflows for such things as asynchronously opening a reader of a given URL.  If you’ll look at the F# PowerPack library, it already has an extension method for asynchronously downloading a string, such as HTML from a website in WebClient.AsyncDownloadString.  For my example, let’s try the same approach from above to apply it to our situation:

open System
open System.IO
open System.Net
 
type WebClient with
  member this.AsyncOpenRead (address:Uri) : Async<Stream> =
    async { let userToken = new obj()
            this.OpenReadAsync(address, userToken)

            // Loop until we see a reply with the same token
            let rec loop() = 
              async { let! args = 
                Async.AwaitEvent(this.OpenReadCompleted,
                                 cancelAction=this.CancelAsync)
                      if args.UserState <> userToken then return! loop()
                      else
                        let result = 
                          if args.Cancelled then 
                            AsyncCanceled (new OperationCanceledException())
                          elif args.Error <> null then AsyncException args.Error
                          else  AsyncOk args.Result 
                        return! AsyncResult.Commit(result) }
            return! loop() }

In this case, we’re going to create a token in which we can track our particular instance of the async operation.  We call the OpenReadAsync method with our address and our token, and then we loop until we get the callback with our associated token.  Once we do, then we set the result much as we did above.  You’ll notice a familiar trend that most of the async operations on the WebClient can be extended the same way this one was, such as OpenWrite, DownloadXXX and UploadXXX.

We can now download data from a reader as we have in this example below:

open System
open System.IO
open System.Net
open System.Threading

let readHtml (address : Uri) =
  async { let wc = new WebClient()
          use! stream = wc.AsyncOpenRead(address)
          use reader = new StreamReader(stream)
          return! reader.AsyncReadToEnd() }

#if INTERACTIVE
let context = SynchronizationContext.Current
SynchronizationContext.SetSynchronizationContext(null)
#endif

let html = 
  readHtml(Uri "http://bing.com") 
  |> Async.RunSynchronously

#if INTERACTIVE
SynchronizationContext.SetSynchronizationContext(context)
#endif
do()

So, as you can see from above, we simply read the HTML from a page using the WebClient approach instead of the WebRequest way using async workflows.  Once again, we have to deal with the context issues only during F# Interactive. 

Conclusion

From these two examples, we can see how you can take programming models built around events and turn them into async functions that can easily interact with F#’s async workflows.  I think over this past series we’ve covered quite a bit dealing with first class events in F#.  In the next series, we’ll start to look at the Reactive Framework and new ways of thinking around handling events.

This entry was posted in Event-based Porgramming, F#, Functional Programming, Reactive Framework. Bookmark the permalink. Follow any comments here with the RSS feed for this post.