Observer the following function:
public Task RunInOrderAsync<TTaskSeed>(IEnumerable<TTaskSeed> taskSeedGenerator,
CreateTaskDelegate<TTaskSeed> createTask,
OnTaskErrorDelegate<TTaskSeed> onError = null,
OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
{
Action<Exception, TTaskSeed> onFailed = (exc, taskSeed) =>
{
if (onError != null)
{
onError(exc, taskSeed);
}
};
Action<Task> onDone = t =>
{
var taskSeed = (TTaskSeed)t.AsyncState;
if (t.Exception != null)
{
onFailed(t.Exception, taskSeed);
}
else if (onSuccess != null)
{
onSuccess(t, taskSeed);
}
};
var enumerator = taskSeedGenerator.GetEnumerator();
Task task = null;
while (enumerator.MoveNext())
{
if (task == null)
{
try
{
task = createTask(enumerator.Current);
Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current));
}
catch (Exception exc)
{
onFailed(exc, enumerator.Current);
}
}
else
{
task = task.ContinueWith((t, taskSeed) =>
{
onDone(t);
var res = createTask((TTaskSeed)taskSeed);
Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed));
return res;
}, enumerator.Current).TaskUnwrap();
}
}
if (task != null)
{
task = task.ContinueWith(onDone);
}
return task;
}
Where TaskUnwrap
is the state preserving version of the standard Task.Unwrap
:
public static class Extensions
{
public static Task TaskUnwrap(this Task<Task> task, object state = null)
{
return task.Unwrap().ContinueWith((t, _) =>
{
if (t.Exception != null)
{
throw t.Exception;
}
}, state ?? task.AsyncState);
}
}
The RunInOrderAsync
method allows to run N tasks asynchronously, but sequentially - one after another. In effect, it runs the tasks created from the given seeds with the concurrency limit of 1.
Let us assume that the tasks created from the seeds by the createTask
delegate do not correspond themselves to multiple concurrent tasks.
Now, I would like to throw in the maxConcurrencyLevel parameter, so the function signature would look like this:
Task RunInOrderAsync<TTaskSeed>(int maxConcurrencyLevel,
IEnumerable<TTaskSeed> taskSeedGenerator,
CreateTaskDelegate<TTaskSeed> createTask,
OnTaskErrorDelegate<TTaskSeed> onError = null,
OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
And here I am a bit stuck.
The SO has questions like these:
Which basically propose two ways to attack the problem:
- Using
Parallel.ForEach
with ParallelOptions
specifying the MaxDegreeOfParallelism
property value as equal to the desired max concurrency level.
- Using a custom
TaskScheduler
with the desired MaximumConcurrencyLevel
value.
The second approach doesn't cut it, because all the tasks involved must use the same task scheduler instance. For that, all the methods used to return a Task
must have an overload accepting the custom TaskScheduler
instance. Unfortunately, Microsoft is not very consistent with that respect. For instance, SqlConnection.OpenAsync
does not accept such an argument (but TaskFactory.FromAsync
does).
The first approach implies that I will have to convert tasks to actions, something like this:
() => t.Wait()
I am not sure it is a good idea, but I will be glad to get more input on that.
Another approach is to utilize TaskFactory.ContinueWhenAny
, but that is messy.
Any ideas?
EDIT 1
I would like to clarify the reasons for wanting the limit. Our tasks ultimately execute SQL statements against the same SQL server. What we want is a way to limit the amount of concurrent outgoing SQL statements. It is entirely possible that there will be other SQL statements executing concurrently from other pieces of code, but this one is a batch processor and could potentially flood the server.
Now, be advised, that although we are talking about the same SQL server, there are numerous databases on that same server. So, it is not about limiting the amount of open SQL connections to the same database, because the database may not be the same at all.
That is why doom's day solutions like ThreadPool.SetMaxThreads()
are irrelevant.
Now, about SqlConnection.OpenAsync
. It was made asynchronous for a reason - it might make a roundtrip to the server and thus might be subject to the network latency and other lovely side effects of distributed environment. As such it is no different than other async methods which do accept the TaskScheduler
parameter. I tend to think that not accepting one is just a bug.
EDIT 2
I would like to preserve the asynchronous spirit of the original function. Hence I wish to avoid any explicit blocking solutions.
EDIT 3
Thanks to @fsimonazzi's answer I now have a working implementation of the desired functionality. Here is the code:
var sem = new SemaphoreSlim(maxConcurrencyLevel);
var tasks = new List<Task>();
var enumerator = taskSeedGenerator.GetEnumerator();
while (enumerator.MoveNext())
{
tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) =>
{
Task task = null;
try
{
task = createTask((TTaskSeed)taskSeed);
if (task != null)
{
Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed));
task = task.ContinueWith(t =>
{
sem.Release();
onDone(t);
});
}
}
catch (Exception exc)
{
sem.Release();
onFailed(exc, (TTaskSeed)taskSeed);
}
return task;
}, enumerator.Current).TaskUnwrap());
}
return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());
See Question&Answers more detail:
os