Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
247 views
in Technique[技术] by (71.8m points)

c# - Generic class for performing mass-parallel queries. Feedback?

I don't understand why, but there appears to be no mechanism in the client library for performing many queries in parallel for Windows Azure Table Storage. I've created a template class that can be used to save considerable time, and you're welcome to use it however you wish. I would appreciate however, if you could pick it apart, and provide feedback on how to improve this class.

public class AsyncDataQuery<T> where T: new()
{
    public AsyncDataQuery(bool preserve_order)
    {
        m_preserve_order = preserve_order;
        this.Queries = new List<CloudTableQuery<T>>(1000);
    }

    public void AddQuery(IQueryable<T> query)
    {
        var data_query = (DataServiceQuery<T>)query;
        var uri = data_query.RequestUri; // required

        this.Queries.Add(new CloudTableQuery<T>(data_query));
    }

    /// <summary>
    /// Blocking but still optimized.
    /// </summary>
    public List<T> Execute()
    {
        this.BeginAsync();
        return this.EndAsync();
    }

    public void BeginAsync()
    {
        if (m_preserve_order == true)
        {
            this.Items = new List<T>(Queries.Count);
            for (var i = 0; i < Queries.Count; i++)
            {
                this.Items.Add(new T());
            }
        }
        else
        {
            this.Items = new List<T>(Queries.Count * 2);
        }

        m_wait = new ManualResetEvent(false);

        for (var i = 0; i < Queries.Count; i++)
        {
            var query = Queries[i];
            query.BeginExecuteSegmented(callback, i);
        }
    }

    public List<T> EndAsync()
    {
        m_wait.WaitOne();
        m_wait.Dispose();

        return this.Items;
    }

    private List<T> Items { get; set; }
    private List<CloudTableQuery<T>> Queries { get; set; }

    private bool m_preserve_order;
    private ManualResetEvent m_wait;
    private int m_completed = 0;
    private object m_lock = new object();

    private void callback(IAsyncResult ar)
    {
        int i = (int)ar.AsyncState;
        CloudTableQuery<T> query = Queries[i];
        var response = query.EndExecuteSegmented(ar);
        if (m_preserve_order == true)
        { // preserve ordering only supports one result per query
            lock (m_lock)
            {
                this.Items[i] = response.Results.Single();
            }
        }
        else
        { // add any number of items
            lock (m_lock)
            {
                this.Items.AddRange(response.Results);
            }
        }
        if (response.HasMoreResults == true)
        { // more data to pull
            query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
            return;
        }
        m_completed = Interlocked.Increment(ref m_completed);
        if (m_completed == Queries.Count)
        {
            m_wait.Set();
        }
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Guess I'm late to the party. I would add two things:

  1. ManualResetEvent is IDisposable. So you need to make sure it gets disposed somewhere.
  2. Error handling - if one of the queries fails it'll probably fail the whole thing. You should probably retry failed requests. Alternatively you could return the values you did get back with some indication of which queries failed, so that the caller could retry the queries.
  3. Client side timeouts - there are none. This isn't a problem if the server side times out for you, but if that ever fails (eg, network issues) the client will hang forever.

Also, I think this is actually a better approach that the Task Parallel Library. I tried the Task-per-query approach before this. The code was actually more awkward, and it tended to result in having a lot of active threads. I still haven't tested extensively with your code, but it seems to work better on first blush.

Update

I've put some work into a more-or-less rewrite of the code above. My rewrite removes all locking, supports client-side timeouts of hung transactions (rare, but it does happen, and can really ruin your day), and some exception handling logic. There is a full solution with tests up on Bitbucket. The most relevant code lives in one file, though it does require some helpers that are in other parts of the project.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...