One problem with your existing solution is that you call your RunWorker
in a fire-and-forget manner, albeit on a new thread (i.e., new Thread(RunWorker).Start()
).
RunWorker
is an async
method, it will return to the caller when the execution point hits the first await
(i.e. await PerformWebRequestAsync(message)
). If PerformWebRequestAsync
returns a pending task, RunWorker
returns and the new thread you just started terminates.
I don't think you need a new thread here at all, just use AmazonSQSClient.ReceiveMessageAsync
and await
its result. Another thing is that you shouldn't be using async void
methods unless you really don't care about tracking the state of the asynchronous task. Use async Task
instead.
Your code might look like this:
List<Task> _workers = new List<Task>();
CancellationTokenSource _cts = new CancellationTokenSource();
protected override void OnStart(string[] args)
{
for (int i = 0; i < _MAX_WORKERS; i++)
{
_workers.Add(RunWorkerAsync(_cts.Token));
}
}
public async Task RunWorkerAsync(CancellationToken token)
{
while(true)
{
token.ThrowIfCancellationRequested();
// .. get message from amazon sqs sync.. about 20ms
var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false);
try
{
await PerformWebRequestAsync(message);
await InsertIntoDbAsync(message);
}
catch(SomeExeception)
{
// ... log
//continue to retry
continue;
}
sqsClient.DeleteMessage();
}
}
Now, to stop all pending workers, you could simple do this (from the main "request dispatcher" thread):
_cts.Cancel();
try
{
Task.WaitAll(_workers.ToArray());
}
catch (AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Note, ConfigureAwait(false)
is optional for Windows Service, because there's no synchronization context on the initial thread, by default. However, I'd keep it that way to make the code independent of the execution environment (for cases where there is synchronization context).
Finally, if for some reason you cannot use ReceiveMessageAsync
, or you need to call another blocking API, or simply do a piece of CPU intensive work at the beginning of RunWorkerAsync
, just wrap it with Task.Run
(as opposed to wrapping the whole RunWorkerAsync
):
var message = await Task.Run(
() => sqsClient.ReceiveMessage()).ConfigureAwait(false);
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…