Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
266 views
in Technique[技术] by (71.8m points)

c# - Turning async socket Parallel and not only Concurrent in very intensive application using TPL

I'm writing an application that uses Socket and it will be very intensive then I really need use every core we have in our big server. I see the question ( how to using ThreadPool to run socket thread parallel? ) here in stackoverflow there is only one answer that point to this MSDN Sample.

But I think it point only how to make it Concurrent and not Parallel, here is someone asking How cpu intensive is opening a socket and its looks be very intensive, someone here tell its dont help TPL TaskFactory.FromAsync vs Tasks with blocking methods and someone teach how to do it here whith TaskFactory.FromAsync (Is there a pattern for wrapping existing BeginXXX/EndXXX async methods into async tasks?).

How can I keep socket operations parallel and performant and if deal whith socket problems like disconnections, half connected sockets and message boundaries are a headache in normal async way. How to deal with it if its put together TPL and Task.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

see that:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace skttool
{
    public class StateObject
    {
        public Socket workSocket = null;
        public const int BufferSize = 1024;
        public byte[] buffer = new byte[BufferSize];
        public int bytesRead = 0;
        public StringBuilder sb = new StringBuilder();
    }

    public class tool
    {
        //-------------------------------------------------
        private ManualResetEvent evtConnectionDone = new ManualResetEvent(false);
        private Socket skttool = null;
        private bool running = false;
        private StateObject state = null;
        //-------------------------------------------------
        toolConfig _cfg;
        public tool(toolConfig cfg)
        {
            _cfg = cfg;
        }
        //-------------------------------------------------
        public void socketListeningSet()
        {
            IPEndPoint localEndPoint;
            Socket skttool;
            byte[] bytes = new Byte[1024];
            localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort);
            skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            skttool.Bind(localEndPoint);
            skttool.Listen(_cfg.maxQtdSockets);
        }
        //-------------------------------------------------
        public void start()
        {
            running = true;
            Task T1 = Task.Factory.StartNew(socketListeningSet);
            T1.ContinueWith(prev =>
            {
                while (running)
                {
                    evtConnectionDone.Reset();
                    Task<Socket> accepetChunk = Task<Socket>.Factory.FromAsync(
                                                                       skttool.BeginAccept,
                                                                       skttool.EndAccept,
                                                                       accept,
                                                                       skttool,
                                                                       TaskCreationOptions.AttachedToParent);
                    accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
                    evtConnectionDone.WaitOne();
                }
            });
        }
        //-------------------------------------------------
        void accept(Task<Socket> accepetChunk)
        {
            state = new StateObject();
            evtConnectionDone.Set();
            state.workSocket = accepetChunk.Result;
            Task<int> readChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginReceive,
                                                       state.workSocket.EndReceive,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void read(Task<int> readChunk)
        {
            state.bytesRead += readChunk.Result;
            if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length)
            {
                read();
                return;
            }
            _data = doTask(_data);
            Task<int> sendChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginSend,
                                                       state.workSocket.EndSend,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void send(Task<int> readChunk)
        {
            state.workSocket.Shutdown(SocketShutdown.Both);
            state.workSocket.Close();
        }
        //-------------------------------------------------
        byte[] doTask(byte[] data)
        {
            return Array.Reverse(data);
        }
        //-------------------------------------------------
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...