The problem
Although the code about which I will talk here I wrote in F#, it is based on the .NET 4 framework, not specifically depending on any particularity of F# (at least it seems so!).
I have some pieces of data on my disk that I should update from the network, saving the latest version to the disk:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
The problem is that to loadAndSaveAndUpdate
all my data, I would have to execute the function many times:
{1 .. 5000} |> loadAndSaveAndUpdate
Each step would do
- some disk IO,
- some data crunching,
- some network IO (with possibility of lots of latency),
- more data crunching,
- and some disk IO.
Wouldn't it be nice to have this done in parallel, to some degree? Unfortunately, none of my reading and parsing functions are "async-workflows-ready".
The first (not very good) solutions I came up with
Tasks
The first thing I've done was to set up a Task[]
and start them all:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
Then I hit CTRL+ESC just to see how many threads it was using. 15, 17, ..., 35, ..., 170, ... until killed the application! Something was going wrong.
Parallel
I did almost the same thing but using Parallel.ForEach(...)
and the results were the same: lots and lots and lots of threads.
A solution that works... kind of
Then I decided to start only n
threads, Task.WaitAll(of them)
, then other n
, until there were no more tasks available.
This works, but the problem is that when it has finished processing, say, n-1
tasks, it will wait, wait, wait for the damn last Task that insist on blocking due to lots of network latency. This is not good!
So, how would you attack this problem? I'd appreciate to view different solutions, involving either Async Workflows (and in this case how to adapt my non-async functions), Parallel Extensions, weird parallel patterns, etc.
Thanks.
See Question&Answers more detail:
os