Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.0k views
in Technique[技术] by (71.8m points)

c# - How to stop propagating an asynchronous stream (IAsyncEnumerable)

I have a method that accepts an IAsyncEnumerable as argument, and returns also an IAsyncEnumerable. It calls a web method for each item in the input stream, and propagates the result to the output stream. My question is how can I be notified if the caller of my method has stopped enumerating the output stream, so I can stop enumerating the input stream inside my method? It seems that I should be able to be notified because the caller disposes by default the IAsyncEnumerator that gets from my method. Is there any build-in mechanism that generates such a notification for compiler-generated async methods? If not, what is the easiest to implement alternative?

Example. The web method validates if an url is valid or not. There is a never ending stream of urls provided, but the caller stops enumerating the results when more than 2 invalid urls are found:

var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
    Console.WriteLine($"Url {result.Url} is "
        + (result.IsValid ? "OK" : "Invalid!"));
    if (!result.IsValid) invalidCount++;
    if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);

The generator of the urls. One url is generated every 300 msec.

private static async IAsyncEnumerable<string> GetMockUrls()
{
    int index = 0;
    while (true)
    {
        await Task.Delay(300);
        yield return $"https://mock.com/{++index:0000}";
    }
}

The validator of the urls. There is a requirement that the input stream is enumerated eagerly, so two asynchronous workflows are running in parallel. The first workflow inserts the urls in a queue, and the second workflow picks the urls one by one and validates them. A BufferBlock is used as async queue.

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
    {
        yield return (url, await MockValidateUrl(url));
    }
}

Clarification: the queue is mandatory, and removing it is not an option. It is an essential component of this problem.

The validator of a single url. The validation process lasts 300 msec on average.

private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
    await Task.Delay(_random.Next(100, 600));
    return _random.Next(0, 2) != 0;
}

Output:

Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...

The problem is that urls are still generated and received after the caller/client has finished the asynchronous enumeration. I would like to fix this, so that no more messages appear in the console after --Async enumeration finished--.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Edit

The discussion will be easier with an appropriate example. Validating URLs isn't so expensive. What if you need to hit eg 100 URLs and pick the first 3 responses?

In that case both the worker and the buffer make sense.

Edit 2

One of the comments adds extra complexity - the tasks are executed concurrently and the results need to be emitted as they arrive.


For starters, ValidateUrl could be rewritten as an iterator method:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    await foreach (var url in urls)
    {
        Console.WriteLine($"Url {url} received");
        var isValid=await MockValidateUrl(url);
        yield return (url, isValid);
    }
}

There's no need for a worker Task as all methods are asynchronous. The iterator method won't proceed unless a consumer asks for a result. Even if MockValidateUrl does something expensive, it could use a Task.Run itself or get wrapped in a Task.Run. That would generate quite a few tasks though.

For completeness' sake you can add a CancellationToken and ConfigureAwait(false) :

public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
       IAsyncEnumerable<string> urls, 
       [EnumeratorCancellation]CancellationToken token=default)
{
    await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
    {
        var isValid=await MockValidateUrl(url).ConfigureAwait(false);
        yield return (url,isValid);
    }
}

In any case, as soon as the caller stops iterating, ValidateUrls will stop.

Buffering

Buffering is a problem - no matter how it's programmed, the worker won't stop until the buffer fills. The buffer's size is how many iterations the worker will go on before it realizes it needs to stop. This is a great case for a Channel (yes, again!) :

public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        IAsyncEnumerable<string> urls,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.WithCancellation(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader.ReadAllAsync(token);
}

It's better to pass around ChannelReaders instead of IAsyncEnumerables though. At the very least, no async enumerator is constructed until someone tries to read from the ChannelReader. It's also easier to construct pipelines as extension methods :

public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
        this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.ReadAllAsync(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader;
}

This syntax allows constructing pipelines in a fluent manner. Let's say we have this helper method to convert IEnumerables to channesl (or IAsyncEnumerables) :

public static ChannelReader<T> AsChannel(
         IEnumerable<T> items)
{
    var channel=Channel.CreateUnbounded();        
    var writer=channel.Writer;
    foreach(var item in items)
    {
        channel.TryWrite(item);
    }
    return channel.Reader;
}

We can write :

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls();

await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
   //Use the items here
}

Concurrent calls with immediate propagation

That's easy with channels, although the worker in this time needs to fire all of the tasks at once. Essentially, we need multiple workers. That's not something that can be done with just IAsyncEnumerable.

First of all, if we wanted to use eg 5 concurrent tasks to process the inputs we could write

    var tasks = Enumerable.Range(0,5).
                  .Select(_ => Task.Run(async ()=>{
                                 /// 
                             },token));
    _ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));        

instead of :

    _ = Task.Run(async ()=>{
        /// 
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        

Using a large number of workers could be enough. I'm not sure if IAsyncEnumerable can be consumed by multiple workers, and I don't really want to find out.

Premature Cancellation

All of the above work if the client consumes all results. To stop processing after eg the first 5 results though, we need the CancellationToken :

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls(cts.Token);

int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
    //Break after 3 iterations
    if(i++>2)
    {
        break;
    }
    ....
}

cts.Cancel();

This code itself could be extracted in a method that receives a ChannelReader and, in this case, the CancellationTokenSource :

static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
    {
    int i=0;
    await foreach(var (url,isValid) in pipeline.ReadAllAsync())
    {
        //Break after 3 iterations
        if(i++>2)
        {
            break;
        }
        ....
    }

    cts.Cancel();        
}

And the pipeline becomes :

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     
                    .ValidateUrls(cts.Token)
                    .LastStep(cts);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...