Consider a queue holding a lot of jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.
If I use something like this
while (true)
{
var job = Queue.PopJob();
if (job == null)
break;
Task.Factory.StartNew(job.Execute);
}
Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<
I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelism because I can't use Parallel.Invoke or Parallel.ForEach
3 alternatives I've found
Replace Task.Factory.StartNew with
Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
task.Start();
Which seems to somewhat solve the problem but I am not clear exactly what this is doing and if this is the best method.
Create a custom task scheduler that limits the degree of concurrency
Use something like BlockingCollection to add jobs to collection when started and remove when finished to limit number that can be running.
With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.
Have I understood this correctly - which is the better way, or is there another way?
EDIT - This is what I've come up with from the answers below, producer-consumer pattern.
As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).
// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);
// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
Thread consumer = new Thread(() =>
{
while (!jobs.IsCompleted)
{
var job = jobs.Take();
job.Execute();
}
}
consumer.Start();
}
// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
var job = Queue.PopJob();
if (job != null)
jobs.Add(job);
else
{
jobs.CompletedAdding()
// May need to wait for running jobs to finish
break;
}
}
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…