Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
325 views
in Technique[技术] by (71.8m points)

c# - TPL Dataflow Speedup?

I wonder whether the following code can be optimized to execute faster. I currently seem to max out at around 1.4 million simple messages per second on a pretty simple data flow structure. I am aware that this sample process passes/transforms messages synchronously, however, I currently test TPL Dataflow as a possible replacement for my own custom solution based on Tasks and concurrent collections. I know the terms "concurrent" already suggest I run things in parallel but for current testing purposes I pushed messages on my own solution through synchronously and I get to about 5.1 million messages per second. What am I missing here, I read TPL Dataflow was pushed as a high throughput, low latency solution but so far I must be overlooking performance tweaks. Anyone who could point me into the right direction please?

class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete();

        action.Completion.ContinueWith(t =>
            {
                watch.Stop();

                Console.WriteLine("All Blocks finished processing");
                Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
            });

        Console.ReadLine();
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I think this mostly comes down to one thing: your test is pretty much meaningless. All those blocks are supposed to do something, and use multiple cores and asynchronous operations to do that.

Also, in your test, it's likely that a lot of time is spent on synchronization. With a more realistic code, the code will take some time to execute, so there will be less contention, so the actual overhead will be smaller than what you measured.

But to actually answer your question, yes, you're overlooking some performance tweaks. Specifically, SingleProducerConstrained, which means data structures with less locking can be used. If I use this on both blocks (the BufferBlock is completely useless here, you can safely remove it), the rate raises from about 3–4 millions of items per second to more than 5 millions on my computer.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...