Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
682 views
in Technique[技术] by (71.8m points)

c# - How to await the results of an IAsyncEnumerable<Task<T>>, with a specific level of concurrency

I have an asynchronous stream of tasks, that is generated by applying an async lambda to a stream of items:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

The methods AsyncEnumerable.Range and Select above are provided from the System.Linq.Async package.

The result I want is a stream of results, expressed as an IAsyncEnumerable<string>. The results must be streamed in the same order as the originated tasks. Also the enumeration of the stream must be throttled, so than no more than a specified number of tasks are active at any given time.

I would like a solution in the form of an extension method on the IAsyncEnumerable<Task<T>> type, so that I could chain it multiple times and form a processing pipeline, similar in functionality with a TPL Dataflow pipeline, but expressed fluently. Below is the signature of the desirable extension method:

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

Accepting also a CancellationToken as argument would be a nice feature.


Update: For completeness I am including an example of a fluent processing pipeline formed by chaining twice the AwaitResults method. This pipeline starts with a PLINQ block, just to demonstrate that mixing PLINQ and Linq.Async is possible.

int[] results = await Partitioner
    .Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();

Console.WriteLine($"Results: {String.Join(", ", results)}");

Expected output:

Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20


Note: In retrospect the AwaitResults method should probably be named Merge, and the concurrencyLevel argument should be named maxConcurrent, because its functionality resembles the Merge operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge that produces IAsyncEnumerable<T>s, but none of its overloads operate on IAsyncEnumerable<Task<T>> sources. It operates on IEnumerable<IAsyncEnumerable<TSource>> and IAsyncEnumerable<IAsyncEnumerable<TSource>> sources. A parameter bufferCapacity could also be added, in order to control explicitly the size of the buffer needed for the awaiting/merging operation.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Here is my implementation of the AwaitResults method. It is based on a SemaphoreSlim for controlling the concurrency level, and on a Channel<Task<TResult>> that is used as an async queue. The enumeration of the source IAsyncEnumerable<Task<TResult>> happens inside a fire-and-forget task (the feeder), that pushes the hot tasks to the channel. It also attaches a continuation to each task, where the semaphore is released.

The last part of the method is the yielding loop, where the tasks are dequeued from the channel one by one, and then awaited sequentially. This way the results are yielded in the same order as the tasks in the source stream.

This implementation requires that each task is awaited twice, which means that it couldn't be used for a source of type IAsyncEnumerable<ValueTask<TResult>>, since a ValueTask can only be awaited once.

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel = 1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (concurrencyLevel < 1)
        throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));

    var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
    var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
    var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
    var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);

    // Feeder task: fire and forget
    _ = Task.Run(async () =>
    {
        try
        {
            await foreach (var task in source
                .WithCancellation(completionCts.Token).ConfigureAwait(false))
            {
                HandleTaskCompletion(task);
                await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
                    .ConfigureAwait(false);
                await semaphore.WaitAsync(completionCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
            tasksChannel.Writer.Complete();
        }
        catch (Exception ex)
        {
            tasksChannel.Writer.Complete(ex);
        }
    });

    async void HandleTaskCompletion(Task task)
    {
        try
        {
            await task.ConfigureAwait(false);
        }
        catch
        {
            // Ignore exceptions here
        }
        finally
        {
            semaphore.Release();
        }
    }

    try
    {
        while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            while (tasksChannel.Reader.TryRead(out var task))
            {
                yield return await task.ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
    }
}

An important detail is the try-finally block around the final yielding loop. This is required for the case that the caller of the method abandons prematurely the enumeration of the resulting stream. In that case the enumeration of the source stream should also be terminated, and this termination is propagated backward using a CancellationTokenSource. Without it the feeder task would never be completed, the objects would never be garbage collected, and memory would be leaked.

Note: Cancelling the cancellationToken may not cancel the whole operation instantaneously. For maximum responsiveness the same cancellationToken should be used for cancelling the individual tasks.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...