Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
349 views
in Technique[技术] by (71.8m points)

asynchronous - How to make a "fire & forget" async FIFO queue in c#?

I'm trying to process documents asynchronously. The idea is that the user sends documents to a service, which takes time, and will look at the results later (about 20-90 seconds per document).

Ideally, I would like to just fill some kind of observable collection that would be emptied by the system as fast as it can. When there is an item, process it and produce the expected output in another object, and when there is no item just do nothing. When the user checks the output collection, he will find the items that are already processed.

Ideally all items would be visible from the start and would have a state (completed, ongoing or in queue), but once I know how to do the first, I should be able to handle the states.

I'm not sure which object to use for that, right now I'm looking at BlockingCollection but I don't think it's suited for the job, as I can't fill it while it's being emptied from the other end.

       private BlockingCollection<IDocument> _jobs = new BlockingCollection<IDocument>();
       public ObservableCollection<IExtractedDocument> ExtractedDocuments { get; }

       public QueueService()
       {
           ExtractedDocuments = new ObservableCollection<IExtractedDocument>();
       }
       
       public async Task Add(string filePath, List<Extra> extras)
       {
           if (_jobs.IsAddingCompleted || _jobs.IsCompleted)
               _jobs = new BlockingCollection<IDocument>();
     
           var doc = new Document(filePath, extras);
           _jobs.Add(doc);
           _jobs.CompleteAdding();
           
           await ProcessQueue();
       }

       private async Task ProcessQueue()
       {
           foreach (var document in _jobs.GetConsumingEnumerable(CancellationToken.None))
           {
               var resultDocument = await service.ProcessDocument(document);
               ExtractedDocuments.Add(resultDocument );
               Debug.WriteLine("Job completed");
           }
       }

This is how I'm handling it right now. If I remove the CompleteAdding call, it hangs on the second attempt. If I have that statement, then I can't just fill the queue, I have to empty it first which defeats the purpose.

Is there a way of having what I'm trying to achieve? A collection that I would fill and the system would process asynchronously and autonomously?

To summarize, I need :

  • A collection that I can fill, that would be processed gradually and asynchronously. A document or series or document can be added while some are being processed.
  • An ouput collection that would be filled after the process is complete
  • The UI thread and app to still be responsive while everything is running
  • I don't need to have multiple processes in parallel, or one document at a time. Whichever is easiest to put in place and maintain will do (small scale application). I'm assuming one at a time is simpler.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

A common pattern here is to have a callback method that executes upon a document state change. With a background task running, it will chew threw documents as fast as it can. Call Dispose to shutdown the processor.

If you need to process the callback on a gui thread, you'll need to synchornize the callback to your main thread some how. Windows forms has methods to do this if that's what you are using.

This example program implements all the necessary classes and interfaces, and you can fine tune and tweak things as you need.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        private static Task Callback(IExtractedDocument doc, DocumentProcessor.DocState docState)
        {
            Console.WriteLine("Processing doc {0}, state: {1}", doc, docState);
            return Task.CompletedTask;
        }

        public static void Main()
        {
            using DocumentProcessor docProcessor = new DocumentProcessor(Callback);
            Console.WriteLine("Processor started, press any key to end processing");
            for (int i = 0; i < 100; i++)
            {
                if (Console.KeyAvailable)
                {
                    break;
                }
                else if (i == 5)
                {
                    // make an error
                    docProcessor.Add(null);
                }
                else
                {
                    docProcessor.Add(new Document { Text = "Test text " + Guid.NewGuid().ToString() });
                }
                Thread.Sleep(500);
            }
            Console.WriteLine("Doc processor shut down, press ENTER to quit");
            Console.ReadLine();
        }

        public interface IDocument
        {
            public string Text { get; }
        }

        public class Document : IDocument
        {
            public string Text { get; set; }
        }

        public interface IExtractedDocument : IDocument
        {
            public IDocument OriginalDocument { get; }
            public Exception Error { get; }
        }

        public class ExtractedDocument : IExtractedDocument
        {
            public override string ToString()
            {
                return $"Orig text: {OriginalDocument?.Text}, Extracted Text: {Text}, Error: {Error}";
            }

            public IDocument OriginalDocument { get; set; }

            public string Text { get; set; }

            public Exception Error { get; set; }
        }

        public class DocumentProcessor : IDisposable
        {
            public enum DocState { Processing, Completed, Error }

            private readonly BlockingCollection<IDocument> queue = new BlockingCollection<IDocument>();
            private readonly Func<IExtractedDocument, DocState, Task> callback;
            private CancellationTokenSource cancelToken = new CancellationTokenSource();

            public DocumentProcessor(Func<IExtractedDocument, DocState, Task> callback)
            {
                this.callback = callback;
                Task.Run(() => StartQueueProcessor()).GetAwaiter();
            }

            public void Dispose()
            {
                if (!cancelToken.IsCancellationRequested)
                {
                    cancelToken.Cancel();
                }
            }

            public void Add(IDocument doc)
            {
                if (cancelToken.IsCancellationRequested)
                {
                    throw new InvalidOperationException("Processor is disposed");
                }
                queue.Add(doc);
            }

            private void ProcessDocument(IDocument doc)
            {
                try
                {
                    // do processing
                    DoCallback(new ExtractedDocument { OriginalDocument = doc }, DocState.Processing);
                    if (doc is null)
                    {
                        throw new ArgumentNullException("Document to process was null");
                    }
                    IExtractedDocument successExtractedDocument = DoSomeDocumentProcessing(doc);
                    DoCallback(successExtractedDocument, DocState.Completed);
                }
                catch (Exception ex)
                {
                    DoCallback(new ExtractedDocument { OriginalDocument = doc, Error = ex }, DocState.Error);
                }
            }

            private IExtractedDocument DoSomeDocumentProcessing(IDocument originalDocument)
            {
                return new ExtractedDocument { OriginalDocument = originalDocument, Text = "Extracted: " + originalDocument.Text };
            }

            private void DoCallback(IExtractedDocument result, DocState docState)
            {
                if (callback != null)
                {
                    // send callbacks in background
                    callback(result, docState).GetAwaiter();
                }
            }

            private void StartQueueProcessor()
            {
                try
                {
                    while (!cancelToken.Token.IsCancellationRequested)
                    {
                        if (queue.TryTake(out IDocument doc, 1000, cancelToken.Token))
                        {
                            // can chance to Task.Run(() => ProcessDocument(doc)).GetAwaiter() for parallel execution
                            ProcessDocument(doc);
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // ignore, don't need to throw or worry about this
                }
                while (queue.TryTake(out IDocument doc))
                {
                    DoCallback(new ExtractedDocument { Error = new ObjectDisposedException("Processor was disposed") }, DocState.Error);
                }
            }
        }
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...