This answer makes the following assumptions:
- Fetching messages from the AWS should be serialized. Only the processing of messages should be parallelized.
- Every message fetched from the AWS should be processed. The whole execution should not terminate before all fetched messages have a chance to be processed.
- Every message-processing operation should be awaited. The whole execution should not terminate before the completion of all started tasks.
- Any error that occurs during the processing of a message should be ignored. The whole execution should not terminate because the processing of a single message failed.
- Any error that occurs during the fetching of messages from the AWS should be fatal. The whole execution should terminate, but not before all currently running message-processing operations have completed.
- The execution mechanism should be able to handle the case that a fetch-from-the-AWS operation returned a batch having a different number of messages than the requested number.
Below is an implementation that (hopefully) satisfies these requirements:
/// <summary>
/// Starts an execution loop that fetches batches of messages sequentially,
/// and process them one by one in parallel.
/// </summary>
public static async Task ExecutionLoopAsync<TMessage>(
Func<int, Task<TMessage[]>> fetchMessagesAsync,
Func<TMessage, Task> processMessageAsync,
int fetchCount,
int maxDegreeOfParallelism,
CancellationToken cancellationToken = default)
{
// Arguments validation omitted
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
// Count how many times we have acquired the semaphore, so that we know
// how many more times we have to acquire it before we exit from this method.
int acquiredCount = 0;
try
{
while (true)
{
Debug.Assert(acquiredCount == 0);
for (int i = 0; i < fetchCount; i++)
{
await semaphore.WaitAsync(cancellationToken);
acquiredCount++;
}
TMessage[] messages = await fetchMessagesAsync(fetchCount)
?? Array.Empty<TMessage>();
for (int i = 0; i < messages.Length; i++)
{
if (i >= fetchCount) // We got more messages than we asked for
{
await semaphore.WaitAsync();
acquiredCount++;
}
ProcessAndRelease(messages[i]);
acquiredCount--;
}
if (messages.Length < fetchCount)
{
// We got less messages than we asked for
semaphore.Release(fetchCount - messages.Length);
acquiredCount -= fetchCount - messages.Length;
}
// This method is 'async void' because it is not expected to throw ever
async void ProcessAndRelease(TMessage message)
{
try { await processMessageAsync(message); }
catch { } // Swallow exceptions
finally { semaphore.Release(); }
}
}
}
catch (SemaphoreFullException)
{
// Guard against the (unlikely) scenario that the counting logic is flawed.
// The counter is no longer reliable, so skip the awaiting in finally.
acquiredCount = maxDegreeOfParallelism;
throw;
}
finally
{
// Wait for all pending operations to complete. This could cause a deadlock
// in case the counter has become out of sync.
for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
await semaphore.WaitAsync();
}
}
Usage example:
var cts = new CancellationTokenSource();
Task executionTask = ExecutionLoopAsync<Message>(async count =>
{
return await GetBatchFromAwsAsync(count);
}, async message =>
{
await ProcessMessageAsync(message);
}, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…