I have an asynchronous stream of tasks, that is generated by applying an async lambda to a stream of items:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
The methods AsyncEnumerable.Range
and Select
above are provided from the System.Linq.Async
package.
The result I want is a stream of results, expressed as an IAsyncEnumerable<string>
. The results must be streamed in the same order as the originated tasks. Also the enumeration of the stream must be throttled, so than no more than a specified number of tasks are active at any given time.
I would like a solution in the form of an extension method on the IAsyncEnumerable<Task<T>>
type, so that I could chain it multiple times and form a processing pipeline, similar in functionality with a TPL Dataflow pipeline, but expressed fluently. Below is the signature of the desirable extension method:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
Accepting also a CancellationToken
as argument would be a nice feature.
Update: For completeness I am including an example of a fluent processing pipeline formed by chaining twice the AwaitResults
method. This pipeline starts with a PLINQ block, just to demonstrate that mixing PLINQ and Linq.Async is possible.
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
Expected output:
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20
Note: In retrospect the AwaitResults
method should probably be named Merge
, and the concurrencyLevel
argument should be named maxConcurrent
, because its functionality resembles the Merge
operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge
that produces IAsyncEnumerable<T>
s, but none of its overloads operate on IAsyncEnumerable<Task<T>>
sources. It operates on IEnumerable<IAsyncEnumerable<TSource>>
and IAsyncEnumerable<IAsyncEnumerable<TSource>>
sources. A parameter bufferCapacity
could also be added, in order to control explicitly the size of the buffer needed for the awaiting/merging operation.
See Question&Answers more detail:
os