Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
593 views
in Technique[技术] by (71.8m points)

asynchronous - Scheduling with Async.Parallel

Is there any way that Async.Parallel can be limited/ throttled by introducing a scheduler? I'm looking to execute a Seq of Async<'a> in parallel but don't want to exceed a certain hourly-limit.

I could use a shared mutable variable that each Async<'a> examines but I'd like to avoid this if possible.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Under the cover, the Async.Parallel operation uses the standard .NET thread pool. So, you could configure the thread pool, but that's probably not a good idea (you should not be blocking threads in a thread pool).

If I wanted to implement some throttling, I would probably create an F# agent for this. Agents give you a pretty simple way to coordinate the concurrency - it is probably more code than using mutable variable (for this purpose), but it gives you a nice abstraction:

// We can ask the agent to enqueue a new work item;
// and the agent sends itself a completed notification
type ThrottlingMessage = 
  | Enqueue of Async<unit>
  | Completed

let throttlingAgent limit = MailboxProcessor.Start(fun inbox -> async {
  // The agent body is not executing in parallel, 
  // so we can safely use mutable queue & counter 
  let queue = System.Collections.Generic.Queue<_>()
  let running = ref 0
  while true do
    // Enqueue new work items or decrement the counter
    // of how many tasks are running in the background
    let! msg = inbox.Receive()
    match msg with
    | Completed -> decr running
    | Enqueue w -> queue.Enqueue(w)
    // If we have less than limit & there is some work to
    // do, then start the work in the background!
    while running.Value < limit && queue.Count > 0 do
      let work = queue.Dequeue()
      incr running
      do! 
        // When the work completes, send 'Completed'
        // back to the agent to free a slot
        async { do! work
                inbox.Post(Completed) } 
        |> Async.StartChild
        |> Async.Ignore })

To use this, you can create an agent with a specified limit and then call Enqueue to add your work items:

let w = throttlingAgent 5 
for i in 0 .. 20 do 
  async { printfn "Starting %d" i
          do! Async.Sleep(1000)
          printfn "Done %d" i  }
  |> Enqueue
  |> w.Post

This is solving a bit different problem than the one you have - but it should show the direction (rather than having the Completed notification, you probably want to have some async in the background that sends a specified number of "tokens" every hour).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...