Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
398 views
in Technique[技术] by (71.8m points)

c# - Best way to limit the number of active Tasks running via the Parallel Task Library

Consider a queue holding a lot of jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.

If I use something like this

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<

I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelism because I can't use Parallel.Invoke or Parallel.ForEach

3 alternatives I've found

  1. Replace Task.Factory.StartNew with

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    Which seems to somewhat solve the problem but I am not clear exactly what this is doing and if this is the best method.

  2. Create a custom task scheduler that limits the degree of concurrency

  3. Use something like BlockingCollection to add jobs to collection when started and remove when finished to limit number that can be running.

With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.

Have I understood this correctly - which is the better way, or is there another way?

EDIT - This is what I've come up with from the answers below, producer-consumer pattern.

As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I just gave an answer which is very applicable to this question.

Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.

You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.

Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.

You can't put unreliable systems into production. For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...