Looking for some help with best practices on creating a potentially multi-threaded asynchronous application. This application will look through several directories for a certain pattern (configurable per directory). For all of the files it finds in that directory, it will kick off an asynchronous operation for each file (read/write, DB operations, API calls, etc). The directories themselves should be processed concurrently as they are unrelated to each other.
It's my understanding that Task
may not always execute on a separate thread. Because this application may have to handle dozens to hundreds of files at any one time, I want to make sure I am maximizing throughput of the application. It's also worth noting that there may or may not be files in the directory when this application runs.
Is simply using Task
enough to accomplish this and achieve maximum throughput, or is there some combination of Parallel.ForEach
with an asynchronous function that would be better? Below is what I have created so far just as a test to see and it looks like it's processing 1 directory at a time on the same thread.
Main
class Program {
static IEnumerable<DirectoryConfig> GetDirectoryConfigs() {
return new DirectoryConfig[] {
new DirectoryConfig {
DirectoryPath = @"PATH_1",
Token = "*",
FileProcessor = new FileProcessor()
},
new DirectoryConfig {
DirectoryPath = @"PATH_2",
Token = "*",
FileProcessor = new FileProcessor()
}
};
}
static async Task Main(string[] args) {
IEnumerable<DirectoryConfig> directoryConfigs = GetDirectoryConfigs();
List<Task> tasks = new List<Task>();
foreach(DirectoryConfig config in directoryConfigs) {
Console.WriteLine("Processing directory {0}", config.DirectoryPath);
tasks.Add(new DirectoryMonitor().ProcessDirectoryAsync(config));
}
await Task.WhenAll(tasks);
}
}
DirectoryMonitor
class DirectoryMonitor {
public Task ProcessDirectoryAsync(DirectoryConfig config) {
List<Task> tasks = new List<Task>();
foreach (string file in Directory.GetFiles(config.DirectoryPath, config.Token)) {
tasks.Add(config.FileProcessor.ProcessAsync(file));
}
return Task.WhenAll(tasks);
}
}
FileProcessor
class FileProcessor : IFileProcessor {
public async Task ProcessAsync(string file) {
string fileName = Path.GetFileName(file);
Console.WriteLine("Processing file {0} on thread {1}", fileName,
Thread.CurrentThread.ManagedThreadId);
using (StreamReader reader = new StreamReader(file)) {
int lineNumber = 0;
while(!reader.EndOfStream) {
Console.WriteLine("Reading line {0} of file {1}", ++lineNumber, fileName);
string line = await reader.ReadLineAsync();
await DoAsyncWork(line);
}
}
}
private Task DoAsyncWork(string line) {
return Task.Delay(1000);
}
}
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…