Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
172 views
in Technique[技术] by (71.8m points)

c# - Async with huge data streams

We use IEnumerables to return huge datasets from database:

public IEnumerable<Data> Read(...)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            yield return item;
        }
    }
}

Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:

public async Task<List<Data>> ReadAsync(...)
{
    var result = new List<Data>();
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(await reader.ReadAsync().ConfigureAwait(false))
        {
            // ...
            result.Add(item);
        }
    }
    return result;
}

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The easiest option is using TPL Dataflow. All you need to do is configure an ActionBlock that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.
I would also suggest setting a BoundedCapacity which will throttle the reader reading from the database when the processing can't handle the speed.

var block = new ActionBlock<Data>(
    data => ProcessDataAsync(data),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    });

using(var connection = new SqlConnection(...))
{
    // ...
    while(await reader.ReadAsync().ConfigureAwait(false))
    {
        // ...
       await block.SendAsync(item);
    }
}

You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...