Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
308 views
in Technique[技术] by (71.8m points)

c# - How to work threading with ConcurrentQueue<T>

I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

For instance:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

Update I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

Any concerns about this implementation?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

From my understanding of the problem, you are missing a few things.

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...