Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
249 views
in Technique[技术] by (71.8m points)

c# - Aws Sqs Consumer - Poll only when messages can be processed immediately

I'm trying to create an AWS SQS windows service consumer that will poll messages in batch of 10. Each messages will be executed in its own task for parallel execution. Message processing includes calling different api's and sending email so it might take some time.

My problem is that first, I only want to poll the queue when 10 messages can be processed immediately. This is due to sqs visibility timeout and having the received messages "wait" might go over the visibility timeout and be "back" on the queue. This will produce duplication. I don't think tweaking the visibility timeout is good, because there are still chances that messages will be duplicated and that's what I'm trying to avoid. Second, I want to have some sort of limit for parallelism (ex. max limit of 100 concurrent tasks), so that server resources can be kept at bay since there are also other apps running in the server.

How to achieve this? Or are there any other way to remedy these problems?

question from:https://stackoverflow.com/questions/65850128/aws-sqs-consumer-poll-only-when-messages-can-be-processed-immediately

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

This answer makes the following assumptions:

  1. Fetching messages from the AWS should be serialized. Only the processing of messages should be parallelized.
  2. Every message fetched from the AWS should be processed. The whole execution should not terminate before all fetched messages have a chance to be processed.
  3. Every message-processing operation should be awaited. The whole execution should not terminate before the completion of all started tasks.
  4. Any error that occurs during the processing of a message should be ignored. The whole execution should not terminate because the processing of a single message failed.
  5. Any error that occurs during the fetching of messages from the AWS should be fatal. The whole execution should terminate, but not before all currently running message-processing operations have completed.
  6. The execution mechanism should be able to handle the case that a fetch-from-the-AWS operation returned a batch having a different number of messages than the requested number.

Below is an implementation that (hopefully) satisfies these requirements:

/// <summary>
/// Starts an execution loop that fetches batches of messages sequentially,
/// and process them one by one in parallel.
/// </summary>
public static async Task ExecutionLoopAsync<TMessage>(
    Func<int, Task<TMessage[]>> fetchMessagesAsync,
    Func<TMessage, Task> processMessageAsync,
    int fetchCount,
    int maxDegreeOfParallelism,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);

    // Count how many times we have acquired the semaphore, so that we know
    // how many more times we have to acquire it before we exit from this method.
    int acquiredCount = 0;
    try
    {
        while (true)
        {
            Debug.Assert(acquiredCount == 0);
            for (int i = 0; i < fetchCount; i++)
            {
                await semaphore.WaitAsync(cancellationToken);
                acquiredCount++;
            }

            TMessage[] messages = await fetchMessagesAsync(fetchCount)
                ?? Array.Empty<TMessage>();

            for (int i = 0; i < messages.Length; i++)
            {
                if (i >= fetchCount) // We got more messages than we asked for
                {
                    await semaphore.WaitAsync();
                    acquiredCount++;
                }
                ProcessAndRelease(messages[i]);
                acquiredCount--;
            }

            if (messages.Length < fetchCount)
            {
                // We got less messages than we asked for
                semaphore.Release(fetchCount - messages.Length);
                acquiredCount -= fetchCount - messages.Length;
            }

            // This method is 'async void' because it is not expected to throw ever
            async void ProcessAndRelease(TMessage message)
            {
                try { await processMessageAsync(message); }
                catch { } // Swallow exceptions
                finally { semaphore.Release(); }
            }
        }
    }
    catch (SemaphoreFullException)
    {
        // Guard against the (unlikely) scenario that the counting logic is flawed.
        // The counter is no longer reliable, so skip the awaiting in finally.
        acquiredCount = maxDegreeOfParallelism;
        throw;
    }
    finally
    {
        // Wait for all pending operations to complete. This could cause a deadlock
        // in case the counter has become out of sync.
        for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
            await semaphore.WaitAsync();
    }
}

Usage example:

var cts = new CancellationTokenSource();

Task executionTask = ExecutionLoopAsync<Message>(async count =>
{
    return await GetBatchFromAwsAsync(count);
}, async message =>
{
    await ProcessMessageAsync(message);
}, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...