Stack overflow caused by asynchronous cyclic collection of. NET TCP/IP Socket

We know NET TCP/IP Socket asynchronous circular charging official and most examples are circular charging, but in fact, this method has the problem of stack overflow.

MD, after receiving the Stack Overflow program, it will collapse directly. At present, there are several ways to solve it. Stack Overflow is very wild compared with the several ways proposed by foreigners. Let me first talk about the advantages of one way proposed by foreigners. It has indeed been solved NET TCP/IP Socket has been collecting asynchronously and circularly, which will lead to the problem of Stack Overflow, but the IO throughput is not much up, and the burden on the CPU becomes very large.

Their proposed method is: when this collection is completed, the delivery pull takes the next collection and is initiated by ThreadPool. Although we know that ThreadPool is a thread implemented based on the IoCP completion port model, and its performance is very high, can it have high performance in front-line process delivery and execution? CPU thread context switching does not need hardware cost!

Especially in the case of large-scale concurrency, the burden on the CPU is a big one, so that's why I'm talking about this way. It's a little wild. If a single core garbage VPS server runs like this, we want to give full play to the program performance and maximize the IO throughput. So this method is absolutely, absolutely not feasible.

The reason for this problem is that NET TCP/IP Socket charging is not implemented by the current thread executing the BeginReceive function of the Socket, so the AsyncCallback must be triggered after the current thread calls and executes the BeginReceive function and returns. You know, don't you?

When the current thread is still executing BeginReceive, it will receive the callback, and the callback thread is still the current thread. However, we still need to continue the collection in the callback function, which leads to the problem of recursion. It's ok if it recurses several layers and then other threads complete the execution of EndReceive, The terrible thing is that in the case of a large amount of incoming data in a TCP/IP connection, the collection function will be executed recursively, and there is only one result: stack explosion and program crash.

The. NET Socket::BeginReceive function will first execute the WSARecv function to try to collect the BUFF to the application. If there is no error from the system PULL to BUFF, it will call back directly. Otherwise, wait for the system to notify the IOCP SOCKET RECV to complete, and the IOCP queue worker thread is calling back. That's the problem NET Core cross platform Linux also has the same problem. OK, what should we do at this time?

The method proposed by foreigners is mentioned above. Another method that is not very reliable is to judge the value of the IAsyncResult object passed in when the AsyncCallback callback is called CompletedSynchronously Attribute, and then decide whether to throw the next collection into the ThreadPool queue. In essence, it doesn't make much difference. There are still similar problems, but it will be better.

Then, I propose a method that is suitable for me. This method not only takes into account the performance, but also solves the problem of stack overflow of circular collection, that is, we still let the Socket collect circularly and recursively, but this is limited, not unlimited recursion. Otherwise, the stack will explode. If the recursion reaches a certain number of layers, it needs to be delegated to ThreadPool and other threads will perform the next asynchronous collection.

According to our current test definition, the number of recursive stack layers is better in layer 100. Windows/Linux can be used in general. Layer 100 recursion can basically do the same as the throughput generated by recursion all the time, and the burden on the CPU is not as huge as throwing it into the thread pool queue every time.

However, how many layers can the management stack enter? We can't manage each class / object separately, but need to manage on a unified class / object, because considering the generality, if some programs make a thread overlap many times, the collection and call stack sequence of TCP/IP Socket may still lead to the problem of stack overflow.

example:

        private readonly ThreadProtection server_thread_protection = new ThreadProtection();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        private void PullServerListener(IAsyncResult ar) => this.server_thread_protection.Execute((_) =>
        {
            byte[] buffer = this.server_buffer;
            if (ar == null)
            {
                if (!SocketExtension.BeginReceive(this.server, buffer, 0, MSS, this.PullServerListener))
                {
                    this.Dispose();
                }
            }
            else
            {
                bool disposing = true;
                int count = SocketExtension.EndReceive(this.server, ar);
                if (count > 0)
                {
                    try
                    {
                        if (this.ProcessServerInput(buffer, 0, count))
                        {
                            disposing = false;
                        }
                    }
                    catch (Exception) { }
                }
                if (disposing)
                {
                    this.Dispose();
                }
            }
        });

ThreadProtection. Implementation of CS class

namespace My.Threading
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif
    using System.Security;
    using System.Threading;

    public sealed class ThreadProtection : IDisposable
    {
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary<Thread, Context> _into_rrc = new ConcurrentDictionary<Thread, Context>();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly Timer _into_rrc_timer = null;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly object _globalsync = new object();

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _syncobj = new object();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private Thread _into_thread = null;

        public const int MaxRecursiveLayers = 100;

        private sealed class Context
        {
            public int rrc = 0;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static ThreadProtection()
        {
            _into_rrc_timer = new Timer();
            _into_rrc_timer.Interval = 1000;
            _into_rrc_timer.Tick += (sender, e) =>
            {
                foreach (var kv in _into_rrc)
                {
                    Thread thread = kv.Key;
                    if (!thread.IsAlive)
                    {
                        _into_rrc.TryRemove(thread, out Context context);
                    }
                }
            };
            _into_rrc_timer.Start();
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        public ThreadProtection() : this(MaxRecursiveLayers)
        {

        }

        ~ThreadProtection() => this.Dispose();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        public ThreadProtection(int maxInto)
        {
            if (maxInto < MaxRecursiveLayers)
            {
                maxInto = MaxRecursiveLayers;
            }
            this.MaximumInto = maxInto;
        }

        public event EventHandler<ThreadExceptionEventArgs> UnhandledException;

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private static Context GetContext()
        {
            lock (_globalsync)
            {
                Thread thread = Thread.CurrentThread;
                _into_rrc.TryGetValue(thread, out Context context);
                if (context == null)
                {
                    context = new Context();
                    _into_rrc[thread] = context;
                }
                return context;
            }
        }

        public int CurrentInto
        {
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => Interlocked.CompareExchange(ref GetContext().rrc, 0, 0);
        }

        public int MaximumInto
        {
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
        }

        public Thread IntoThread
        {
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._into_thread;
        }

        public Thread CurrentThread
        {
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => Thread.CurrentThread;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Execute(WaitCallback critical) => this.Execute(critical, null);

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        public void Execute(WaitCallback critical, object state)
        {
            if (critical == null)
            {
                throw new ArgumentNullException(nameof(critical));
            }
            bool can_into = false;
            Thread current_thread = Thread.CurrentThread;
            Context current_context = GetContext();
            lock (this._syncobj)
            {
                Thread into_thread = Interlocked.CompareExchange(ref this._into_thread, null, current_thread);
                if (into_thread != current_thread)
                {
                    Interlocked.Exchange(ref current_context.rrc, 0);
                }
                can_into = this.MaximumInto >= Interlocked.Increment(ref current_context.rrc);
                if (!can_into)
                {
                    Interlocked.Exchange(ref current_context.rrc, 0);
                }
            }
            if (can_into)
            {
                try
                {
                    critical(state);
                }
                catch (Exception e)
                {
                    this.OnUnhandledException(e);
                }
            }
            else
            {
                WaitCallback into_callback = (input_state) =>
                {
                    try
                    {
                        critical(input_state);
                    }
                    catch (Exception e)
                    {
                        this.OnUnhandledException(e);
                    }
                };
                ThreadPool.QueueUserWorkItem(into_callback, state);
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private void OnUnhandledException(Exception e)
        {
            if (e == null)
            {
                return;
            }
            ThreadExceptionEventArgs p = new ThreadExceptionEventArgs(e);
            try
            {
                this.UnhandledException?.Invoke(this, p);
            }
            catch (Exception) { }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            Interlocked.Exchange(ref this.UnhandledException, null);
            GC.SuppressFinalize(this);
        }
    }
}

Keywords: .NET microsoft TCP/IP

Added by kanenas.net on Tue, 01 Feb 2022 21:22:05 +0200