Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
800 views
in Technique[技术] by (71.8m points)

wpf - Is there a Threadsafe Observable collection in .NET 4?

Platform: WPF, .NET 4.0, C# 4.0

Problem: In the Mainwindow.xaml i have a ListBox bound to a Customer collection which is currently an ObservableCollection< Customer >.

ObservableCollection<Customer> c = new ObservableCollection<Customer>();

This collection can be updated via multiple sources, like FileSystem, WebService etc.

To allow parallel loading of Customers I have created a helper class

public class CustomerManager(ref ObsevableCollection<Customer> cust)

that internally spawns a new Task (from Parallel extensions library) for each customer Source and adds a new Customer instance to the customer collection object (passed by ref to its ctor).

The problem is that ObservableCollection< T> (or any collection for that matter) cannot be used from calls other than the UI thread and an exception is encountered:

"NotSupportedException – This type of CollectionView does not support changes to its SourceCollection from a thread different from the Dispatcher thread."

I tried using the

System.Collections.Concurrent.ConcurrentBag<Customer>

collection but it doesnot implement INotifyCollectionChanged interface. Hence my WPF UI won't get updated automatically.

So, is there a collection class that implements both property/collection change notifications and also allows calls from other non-UI threads?

By my initial bing/googling, there is none provided out of the box.

Edit: I created my own collection that inherits from ConcurrentBag< Customer > and also implements the INotifyCollectionChanged interface. But to my surprise even after invoking it in separate tasks, the WPF UI hangs until the task is completed. Aren't the tasks supposed to be executed in parallel and not block the UI thread?

Thanks for any suggestions, in advance.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There are two possible approaches. The first would be to inherit from a concurrent collection and add INotifyCollectionChanged functionality, and the second would be to inherit from a collection that implements INotifyCollectionChanged and add concurrency support. I think it is far easier and safer to add INotifyCollectionChanged support to a concurrent collection. My suggestion is below.

It looks long but most of the methods just call the internal concurrent collection as if the caller were using it directly. The handful of methods that add or remove from the collection inject a call to a private method that raises the notification event on the dispatcher provided at construction, thus allowing the class to be thread safe but ensuring the notifications are raised on the same thread all the time.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Windows.Threading;

namespace Collections
{
    /// <summary>
    /// Concurrent collection that emits change notifications on a dispatcher thread
    /// </summary>
    /// <typeparam name="T">The type of objects in the collection</typeparam>
    [Serializable]
    [ComVisible(false)]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
    public class ObservableConcurrentBag<T> : IProducerConsumerCollection<T>,
        IEnumerable<T>, ICollection, IEnumerable
    {
        /// <summary>
        /// The dispatcher on which event notifications will be raised
        /// </summary>
        private readonly Dispatcher dispatcher;

        /// <summary>
        /// The internal concurrent bag used for the 'heavy lifting' of the collection implementation
        /// </summary>
        private readonly ConcurrentBag<T> internalBag;

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that will raise <see cref="INotifyCollectionChanged"/> events
        /// on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>();
        }

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that contains elements copied from the specified collection 
        /// that will raise <see cref="INotifyCollectionChanged"/> events on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher, IEnumerable<T> collection)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>(collection);
        }

        /// <summary>
        /// Occurs when the collection changes
        /// </summary>
        public event NotifyCollectionChangedEventHandler CollectionChanged;

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event on the <see cref="dispatcher"/>
        /// </summary>
        private void RaiseCollectionChangedEventOnDispatcher(NotifyCollectionChangedEventArgs e)
        {
            this.dispatcher.BeginInvoke(new Action<NotifyCollectionChangedEventArgs>(this.RaiseCollectionChangedEvent), e);
        }

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event
        /// </summary>
        /// <remarks>
        /// This method must only be raised on the dispatcher - use <see cref="RaiseCollectionChangedEventOnDispatcher" />
        /// to do this.
        /// </remarks>
        private void RaiseCollectionChangedEvent(NotifyCollectionChangedEventArgs e)
        {
            this.CollectionChanged(this, e);
        }

        #region Members that pass through to the internal concurrent bag but also raise change notifications

        bool IProducerConsumerCollection<T>.TryAdd(T item)
        {
            bool result = ((IProducerConsumerCollection<T>)this.internalBag).TryAdd(item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
            }
            return result;
        }

        public void Add(T item)
        {
            this.internalBag.Add(item);
            this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        }

        public bool TryTake(out T item)
        {
            bool result = this.TryTake(out item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, item));
            }
            return result;
        }

        #endregion

        #region Members that pass through directly to the internal concurrent bag

        public int Count
        {
            get
            {
                return this.internalBag.Count;
            }
        }

        public bool IsEmpty
        {
            get
            {
                return this.internalBag.IsEmpty;
            }
        }

        bool ICollection.IsSynchronized
        {
            get
            {
                return ((ICollection)this.internalBag).IsSynchronized;
            }
        }

        object ICollection.SyncRoot
        {
            get
            {
                return ((ICollection)this.internalBag).SyncRoot;
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            return ((IEnumerable<T>)this.internalBag).GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable)this.internalBag).GetEnumerator();
        }

        public T[] ToArray()
        {
            return this.internalBag.ToArray();
        }

        void IProducerConsumerCollection<T>.CopyTo(T[] array, int index)
        {
            ((IProducerConsumerCollection<T>)this.internalBag).CopyTo(array, index);
        }

        void ICollection.CopyTo(Array array, int index)
        {
            ((ICollection)this.internalBag).CopyTo(array, index);
        }

        #endregion
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...