Adding Parallel Extensions to F#

In many of my presentations lately, I’ve been using the Parallel Extensions for .NET as part of my heavy computations in F#.  By doing so, I’m able to speed up some of my heavier computations by several fold and take full advantage of my machine.  Over time, with the help of others, I’ve translated many of the functions from the ParallelEnumerable class into those that can easily be consumed by F# in a meaningful way. 

From Seq to PSeq

Within the F# libraries, there is a thin wrapper over the IEnumerable<T> class called seq<’a>, and associated functions in the Seq module.  The goal here is to mimic the signatures and well defined behaviors of the Seq module while wrapping the ParallelEnumerable class and its associated IParallelEnumerable<T> interface.  Just as the Seq implementation which hides the IEnumerable<T> with seq<’a>, we’ll hide the underlying IParallelEnumerable<’a> with the pseq<’a> type.

We have some issues such as translating the F# functions to .NET delegate functions.  We must wrap each of these into the Func constructor in order to call them properly.  But other than that, it’s a pretty smooth integration between the language and the library.

So, without further ado, here is the code:

#light

/// Type wrapper over the IParallelEnumerable
type pseq<’a> = System.Linq.IParallelEnumerable<’a>

module PSeq = 
  open System
  open System.Linq

  /// Append two parallel collections together
  let append (ie1: pseq<’a>) (ie2: pseq<’a>) : pseq<’a> =
    ParallelEnumerable.Concat(ie1, ie2)

  /// This is the method to opt into Parallel LINQ.
  let adapt : seq<’a> -> pseq<’a> = 
    ParallelQuery.AsParallel

  /// This is the method to opt into Parallel LINQ with deg of parallelism
  let adaptn (n:int) (seq:seq<’a>) : pseq<’a> = 
    if n < 1  then adapt seq 
    else ParallelQuery.AsParallel( seq, n )
    
  /// AsOrdered is a method that tells PLINQ to treat a data source as if it was ordered
  let ordered : pseq<’a> -> pseq<’a> = 
    ParallelQuery.AsOrdered
  
  /// AsUnordered tells PLINQ that it should treat a particular intermediate result as if no
  /// order was implied
  let unordered : pseq<’a> -> pseq<’a> = 
    ParallelQuery.AsUnordered
  
  /// This method is to opt out of Parallel LINQ.
  let as_seq : pseq<’a> -> seq<’a> =
    ParallelQuery.AsSequential

  /// Parallel implementation of System.Linq.Enumerable.Average().
  let average_float : pseq<float> -> float
    ParallelEnumerable.Average
    
  /// Parallel implementation of System.Linq.Enumerable.Average().
  let average_float_by (f:‘a -> float) (pe:pseq<’a>) : float =
    ParallelEnumerable.Average(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.ParallelEnumerable.Cast
  let cast : IParallelEnumerable -> pseq<’a> =
    ParallelEnumerable.Cast
    
  /// Parallel implementation of System.Linq.ParallelEnumerable.Distinct
  let distinct : pseq<’a> -> pseq<’a> =
    ParallelEnumerable.Distinct
    
  /// Parallel implementation of System.Linq.ParallelEnumerable.Any
  let exists (f:‘a -> bool) (pe:pseq<’a>) : bool =
    ParallelEnumerable.Any(pe, Func<_,_>(f))

  /// Parallel implementation of System.Linq.Enumerable.Where().
  let filter (f:‘a -> bool) (pe:pseq<’a>) : pseq<’a> = 
    ParallelEnumerable.Where(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.First
  let find (f:‘a -> bool) (pe:pseq<’a>) : ‘a =
    ParallelEnumerable.First(pe, Func<_,_>(f)) 

  /// Parallel implementation of System.Linq.Enumerable.Aggregate().
  let fold (f:‘a -> ‘b -> ‘a) (seed:‘a) (pe:pseq<’b>) : ‘a = 
    ParallelEnumerable.Aggregate(pe, seed, Func<_,_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.All
  let for_all (f:‘a -> bool) (pe:pseq<’a>) : bool =
    ParallelEnumerable.All(pe, Func<_,_>(f))
        
  /// Empty ParallelEnumerable
  let empty<’a> : pseq<’a> = ParallelEnumerable.Empty<’a>()
    
  /// Parallel implementation of System.Linq.Enumerable.First()
  let hd : pseq<’a> -> ‘a =
    ParallelEnumerable.First
      
  /// Parallel implementation of System.Linq.ParallelEnumerable.Count
  let length : pseq<’a> -> int
    ParallelEnumerable.Count  
    
  /// Parallel implementation of System.Linq.Enumerable.Select().
  let map (f:‘a -> ‘b) (pe:pseq<’a>) : pseq<’b> = 
    ParallelEnumerable.Select(pe, Func<_,_>(f))           
    
  /// Parallel implementation of System.Linq.Enumerable.Select().
  let mapi (f:int -> ‘a -> ‘b) (pe:pseq<’a>) : pseq<’b> =
    let f’ x i = f i x
    ParallelEnumerable.Select(pe, Func<_,_,_>(f’))
    
  /// Parallel implementation of System.Linq.Enumerable.Zip
  let map2 (f:‘a -> ‘b -> ‘c) (pe1:pseq<’a>) (pe2:pseq<’b>) : pseq<’c> =
    ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.Reverse
  let rev : pseq<’a> -> pseq<’a> = ParallelEnumerable.Reverse
      
  /// Parallel implementation of Seq.concat
  let concat (pe:pseq<pseq<’a>>) : pseq<’a> = 
    ParallelEnumerable.SelectMany(
        pe, Func<_,_>(fun x -> x :> seq<’a>))
    
  /// Parallel implementation of System.Linq.Enumerable.ElementAt
  let nth (n:int) (pe:pseq<’a>) : ‘a =
    ParallelEnumerable.ElementAt(pe, n)
    
  /// Parallel implementation of System.Linq.Enumerable.OrderBy
  let order_by (f:‘a -> ‘b) (pe:pseq<’a>) =
    ParallelEnumerable.OrderBy(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.Range
  let range (start:int) (count:int) : pseq<int> =
    ParallelEnumerable.Range(start, count)
    
  /// Parallel implementation of System.Linq.Enumerable.Skip
  let skip (n:int) (pe:pseq<’a>) : pseq<’a> =
    ParallelEnumerable.Skip(pe, n)
    
  /// Parallel implementation of System.Linq.Enumerable.SkipWhile
  let skip_while (f:‘a -> bool) (pe:pseq<’a>) : pseq<’a> =
    ParallelEnumerable.SkipWhile(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.Sum().
  let sum_by_int : pseq<int> -> int
    ParallelEnumerable.Sum          
    
  /// Parallel implementation of System.Linq.Enumerable.Take
  let take (n:int) (pe:pseq<’a>) : pseq<’a> =
    ParallelEnumerable.Take(pe, n)
      
  /// Parallel implementation of System.Linq.Enumerable.TakeWhile
  let take_while (f:‘a -> bool) (pe:pseq<’a>) : pseq<’a> =
    ParallelEnumerable.TakeWhile(pe, Func<_,_>(f))
  
  /// Parallel implementation of System.Linq.Enumerable.ToArray().
  let to_array : pseq<’a> -> ‘a array = 
    ParallelEnumerable.ToArray
    
  /// Parallel implementation of to list  
  let to_list<’a> : pseq<’a> -> ‘a list =
    to_array >> Array.to_list    
      
  /// Parallel implementation of System.Linq.Enumerable.Zip
  let zip (pe1:pseq<’a>) (pe2:pseq<’b>) : pseq<’a * ‘b> =
    let f a b = (a, b)
    ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))             
      
[<AutoOpen>]
module Operators =
  
  /// Used for sequence expressions
  /// Example : let f = pseq [for x in [1..10] -> x * x] 
  let pseq (ie:seq<’a>) : pseq<’a> = PSeq.adapt ie

 

That’s it.  That’s all there is to it.  Now we can do such simple examples as the following:

> pseq [1..1000] |> PSeq.map ((*) 2) 
    |> PSeq.filter(fun x -> x % 3 = 0) 
    |> PSeq.sum_by_int;;
val it : int = 333666

Of course we could do more advanced than this but this is just a start.   There are plenty of possibilities given the number of operators supported, and I’ll cover more of this in later posts.

What Does the Future Hold?

With the ease of integration here, we should wonder, how in the future F# might interact with the Parallel Extensions for .NET.  With both F# and the Parallel Extensions for .NET becoming first class citizens, the case can be made that it should be an integral part of the F# libraries as well.  These are interesting times when it comes to concurrency oriented programming and with the combination of F# and the Parallel Extensions, we have a powerful tool in our chest for performing algorithms over big data. 

This entry was posted in Concurrency, F#. Bookmark the permalink. Follow any comments here with the RSS feed for this post.