Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
288 views
in Technique[技术] by (71.8m points)

c# - Process files concurrently and asynchronously

Looking for some help with best practices on creating a potentially multi-threaded asynchronous application. This application will look through several directories for a certain pattern (configurable per directory). For all of the files it finds in that directory, it will kick off an asynchronous operation for each file (read/write, DB operations, API calls, etc). The directories themselves should be processed concurrently as they are unrelated to each other.

It's my understanding that Task may not always execute on a separate thread. Because this application may have to handle dozens to hundreds of files at any one time, I want to make sure I am maximizing throughput of the application. It's also worth noting that there may or may not be files in the directory when this application runs.

Is simply using Task enough to accomplish this and achieve maximum throughput, or is there some combination of Parallel.ForEach with an asynchronous function that would be better? Below is what I have created so far just as a test to see and it looks like it's processing 1 directory at a time on the same thread.

Main

class Program {
    static IEnumerable<DirectoryConfig> GetDirectoryConfigs() {
        return new DirectoryConfig[] {
            new DirectoryConfig {
                DirectoryPath = @"PATH_1",
                Token = "*",
                FileProcessor = new FileProcessor()
            },
            new DirectoryConfig {
                DirectoryPath = @"PATH_2",
                Token = "*",
                FileProcessor = new FileProcessor()
            }
        };
    }
    static async Task Main(string[] args) {
        IEnumerable<DirectoryConfig> directoryConfigs = GetDirectoryConfigs();

        List<Task> tasks = new List<Task>();

        foreach(DirectoryConfig config in directoryConfigs) {
            Console.WriteLine("Processing directory {0}", config.DirectoryPath);

            tasks.Add(new DirectoryMonitor().ProcessDirectoryAsync(config));
        }

        await Task.WhenAll(tasks);
    }
}

DirectoryMonitor

class DirectoryMonitor {
    public Task ProcessDirectoryAsync(DirectoryConfig config) {
        List<Task> tasks = new List<Task>();

        foreach (string file in Directory.GetFiles(config.DirectoryPath, config.Token)) {
            tasks.Add(config.FileProcessor.ProcessAsync(file));
        }

        return Task.WhenAll(tasks);
    }
}

FileProcessor

class FileProcessor : IFileProcessor {
    public async Task ProcessAsync(string file) {
        string fileName = Path.GetFileName(file);
        Console.WriteLine("Processing file {0} on thread {1}", fileName,
            Thread.CurrentThread.ManagedThreadId);
        using (StreamReader reader = new StreamReader(file)) {
            int lineNumber = 0;
            while(!reader.EndOfStream) {
                Console.WriteLine("Reading line {0} of file {1}", ++lineNumber, fileName);
                string line = await reader.ReadLineAsync();

                await DoAsyncWork(line);
            }
        }
    }

    private Task DoAsyncWork(string line) {
        return Task.Delay(1000);
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

For this kind of job a powerful tool you could use is the TPL Dataflow library. With this tool you can create a processing pipeline consisting of many linked blocks, with the data flowing from the first block to the last (circles and meshes are also possible).

The advantages of this approach are:

  1. You get data-parallelism on top of task-parallelism. All blocks are working concurrently and independently from each other.
  2. You can configure optimally the level of concurrency (a.k.a. degree of parallelism) of each heterogeneous operation. For example doing API calls may be highly parallelizable, while reading from the hard disk may be not parallelizable at all.
  3. You get advanced options out of the box (BoundedCapacity, CancellationToken and others).
  4. You get built-in support for both synchronous and asynchronous delegates.

Below is how you could rewrite your original code in TPL Dataflow terms. Three blocks are used, two TransformManyBlocks and one ActionBlock.

var directoryBlock = new TransformManyBlock<DirectoryConfig, string>(config =>
{
    return Directory.GetFiles(config.DirectoryPath, config.Token);
});

var fileBlock = new TransformManyBlock<string, string>(filePath =>
{
    return File.ReadLines(filePath);
});

var lineBlock = new ActionBlock<string>(async line =>
{
    await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 4
});

directoryBlock.LinkTo(fileBlock, new DataflowLinkOptions { PropagateCompletion = true });
fileBlock.LinkTo(lineBlock, new DataflowLinkOptions { PropagateCompletion = true });

foreach (DirectoryConfig config in GetDirectoryConfigs())
    await directoryBlock.SendAsync(config);

directoryBlock.Complete();
await lineBlock.Completion;

This example is not very good since all the work is done by the last block (the lineBlock), and the first two blocks are doing essentially nothing. It is also not memory-efficient since all lines of all files of all directories will soon become queued in the input buffer of the ActionBlock, unless processing the lines happens to be faster than reading them from the disk. You'll need to configure the blocks with BoundedCapacity to solve this problem.

This example also fails to demonstrate how you could have different blocks for different types of files, and link the directoryBlock to all of them using a different filtering predicate for each link:

directoryBlock.LinkTo(csvBlock, filePath => Path.GetExtension(filePath) == "csv");
directoryBlock.LinkTo(xlsBlock, filePath => Path.GetExtension(filePath) == "xls");
directoryBlock.LinkTo(generalFileBlock); // Anything that is neither csv nor xls

There are also other types of blocks you could use, like the TransformBlock and the BatchBlock. The TPL Dataflow is based on the Task Parallel Library (TPL), and it is essentially a high level task-generator that creates and controls the lifecycle of the tasks needed in order to process a workload of given type, based on declarative configuration. It is built-in the .NET Core, and available as a package for .NET Framework.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...