[SuperSocket 2.0] SuperSocket 2.0 from beginning to end

SuperSocket 2.0 from entry to ignorance

With implementation, explanation, and partial source code interpretation

1 use SuperSocket 2.0 to build a Socket server in the AspNetCore project

1.1 introduction of SuperSocket 2.0

SuperSocket 2.0 is still in Beta, but its function is basically reliable and can be used in production environment

Can from Github source address Make other modifications to the fork code by yourself. If you need direct reference, it is not available in the default Nuget server. You need to add the author's own Nuget server https://www.myget.org/F/supersocket/api/v3/index.json , get preview version

1.2 build a Socket server in AspNetCore

Using SuperSocket 2.0 to quickly build a Socket server is very simple. In the framework, the author implements a supersocket host builder for construction. All we need to do is simply configure some parameters and business logic

In addition, supersocket hostbuilder can be directly embedded into the CreateHostBuilder of AspNetCore project, but it is not recommended to do so

  1. Configure packets and filters / selectors
    • Data packet, that is, the data packet during Socket communication, can be either a packet object or a byte []. If it is a packet object, the decoder needs to be configured in the filter
    • The function of filter is to filter the data packet, then intercept a packet of data, mount the decoder, and directly map the binary data into objects
    • Selector, we can use selector to realize that a port serves multiple protocols. At this time, a selector will be used with multiple filters
  2. Configure IP and ports
    • There are two ways to configure IP and port. One is to write from the program and the other is to write from the configuration file
    • In the introduction of this series, we mostly use the method of program writing and configuration file writing. Later, we will use other methods to expand the business
  3. Configure Session
    • In the program, the author has built-in IAppSession and AppSession for us to use. If we need to customize the Session, we need to inherit AppSession and reference it in the program to switch to the Session defined by us

    • When customizing a Session, since most of the parameters provided in the program are IAppSession, more other interfaces of SuperSocket need to be rewritten to maintain the operation of the program

    • Custom Session is mostly used to add more custom attributes. The author provides us with other options in the design:

      // AppSession source code
      public class AppSession : IAppSession, ILogger, ILoggerAccessor
      {
          //......
      
          private Dictionary<object, object> _items;
      
          public object this[object name]
          {
              get
              {
                  var items = _items;
      
                  if (items == null)
                      return null;
      
                  object value;
                  
                  if (items.TryGetValue(name, out value))
                      return value;
      
                  return null;
              }
      
              set
              {
                  lock (this)
                  {
                      var items = _items;
      
                      if (items == null)
                          items = _items = new Dictionary<object, object>();
      
                      items[name] = value;
                  }
              }
          }
      
          //......
      }
      

      You can see that there is a built-in dictionary. We can directly store the key value value of the attribute in the dictionary, that is, session["prop"] = value; The form of

  4. Configure SessionHandler
    • The SessionHandler will be triggered when the Socket connection is established and disconnected. It is used to handle the business during connection and disconnection. It can be directly configured in supersocket hostbuilder or implemented by rewriting the corresponding interface
    • When connecting, the created Session will be passed into this method
    • When disconnecting, the disconnected Session and the event triggering the disconnection will be passed into the method
  5. Configure PackageHandler
    • PackageHandler is triggered when a packet is received
    • When this method is triggered automatically, we will get two parameters, one is Session and the other is Package However, it should be noted that the Session here is IAppSession type, not our customized Session
      • Session refers to the current Socket connection with various connection information and status
      • Package is the data package obtained through the filter, but it should be noted that, for example, if the data package is a data package with header and footer identifiers, if it is in the form of byte [], it may not be the original package, but the data body after removing the header and footer identifiers That is: 7E 01 02 03 7E. Remove the 7E mark at the head and tail to get 01 02 03
  6. Build & Run
    • There are several ways to build. It is recommended to use buildaserver () and then enable it through StartAsync()

At this point, a Socket server is set up Specific implementation:

// SampleSession
public class SampleSession: AppSession
{
    
}
// Socket Server code
var tcpHost = SuperSocketHostBuilder.Create<byte[], PipelineFilter>()
    .ConfigureSuperSocket(options =>
    {
        options.AddListener(new ListenOptions
        {
            Ip = "Any",
            Port = 4040
        })
        .AddListener(new ListenOptions()
        {
            Ip = "Any",
            Port = 8888
        });
    })
    .UseSession<JT808TcpSession>()
    .UseClearIdleSession()
    .UseSessionHandler(s =>
    {
    })
    .UsePackageHandler(async (s, p) =>
    {
        //Unpacking / answering / forwarding
    })
    .ConfigureErrorHandler((s, v) =>
    {
    })
    .UseMiddleware<InProcSessionContainerMiddleware>()
    .UseInProcSessionContainer()
    .BuildAsServer();
    
await tcpHost.RunAsync();

. UseClearIdleSession() must be called. When using various Socket frameworks, it is inevitable that our application will maintain a large number of zombie connections. UseClearIdleSession() is provided in SuperSocket to automatically reuse idle or disconnected resources

2 basic protocol concepts

2.1 types of basic agreements

2.1.1 fixed header format protocol

As the name suggests, the Header of this kind of protocol is fixed, and the length of the Header is generally fixed, but there are exceptions. In addition, the Header will also contain information such as the length of the data body Then a packet of data can be intercepted according to the length

2.1.2 fixed head and tail identification protocol

The first few bytes and the last few bytes of data packets of this kind of protocol are fixed, so a packet of data can be intercepted through the identification of the head and tail

Generally, in order to avoid the conflict between the data content and the identification of the protocol header and tail, the data packet of this kind of protocol usually sets escape, that is, the place where the header and tail identification appears in the data packet is escaped to other data, so as to avoid errors in identification

2.1.3 fixed packet size protocol

The size of each packet of this kind of protocol is fixed, so it can be read directly according to the length

2.1.4 command line protocol

This type of protocol usually ends with \ r\n and uses string to binary stream for transmission

2.1.5 some other protocols

PS: the private agreements of some hardware manufacturers about the agreement are quite wonderful. Their agreements are diverse... But we won't elaborate here. I'll talk about it again when I have time

3 several basic concepts in supersocket

3.1 Package Type

Package Type refers to the Package Type. What is described here is the structure of the data package. For example, SuperSocket provides some basic package types, such as TextPackageInfo

public class TextPackageInfo
{
    public string Text{get; set;}
}

TextPackageInfo here indicates that this type of packet contains only one string. Of course, we usually have a more complex network packet structure For example, I will show a communication packet containing header and trailer identifiers in the following, which includes header and trailer identifiers, message numbers, terminal IDs, and message bodies:

public class SamplePackage
{
    public byte Begin{get; set;}

    public MessageId MessageId{get; set;}

    public string TerminalId{get; set;}

    public SampleBody Body{get; set;}

    public byte End{get; set;}
}

Of course, some interfaces are also provided in SuperSocket for us to implement some packages with similar formats, but I don't like this method very much. The official documents also give some examples. For example, some packages will have a special field to represent the content type of this package We name this field "Key" This field also tells us what logic to use to handle this type of package This is a very common design in network applications For example, your Key field is of integer type, and your package type needs to implement the interface IKeyedPackageInfo:

public class MyPackage : IKeyedPackageInfo<int>
{
    public int Key { get; set; }

    public short Sequence { get; set; }

    public string Body { get; set; }
}

3.2 PipelineFilter Type

This type plays an important role in network protocol parsing It defines how to decode the IO data stream into packets that can be understood by the application In other words, your binary stream data can be identified Package by Package, and can be parsed into the Package object you build Of course, you can also choose not to build and then return the source data directly

These are the basic interfaces of PipelineFilter Your system needs at least one PipelineFilter type that implements this interface

public interface IPipelineFilter
{
    void Reset();

    object Context { get; set; }        
}

public interface IPipelineFilter<TPackageInfo> : IPipelineFilter
    where TPackageInfo : class
{

    IPackageDecoder<TPackageInfo> Decoder { get; set; }

    TPackageInfo Filter(ref SequenceReader<byte> reader);

    IPipelineFilter<TPackageInfo> NextFilter { get; }

}

In fact, SuperSocket has provided some built-in PipelineFilter templates, which can cover almost 90% of the scenes, greatly simplifying your development work So you don't need to implement PipelineFilter completely from scratch Even if these built-in templates can't meet your needs, it's not difficult to implement PipelineFilter completely

3.3 create supersockets using PackageType and PipelineFilter Type

After you define the Package type and PipelineFilter type, you can use SuperSocket hostbuilder to create SuperSocket host.

var host = SuperSocketHostBuilder.Create<StringPackageInfo, CommandLinePipelineFilter>();

In some cases, you may need to implement the interface IPipelineFilterFactory to fully control the creation of PipelineFilter.

public class MyFilterFactory : PipelineFilterFactoryBase<TextPackageInfo>
{
    protected override IPipelineFilter<TPackageInfo> CreateCore(object client)
    {
        return new FixedSizePipelineFilter(10);
    }
}

Then enable the PipelineFilterFactory after the SuperSocket host is created:

var host = SuperSocketHostBuilder.Create<StringPackageInfo>();
host.UsePipelineFilterFactory<MyFilterFactory>();

4 PipelineFilter in supersocket to implement its own PipelineFilter

4.1 built in PipelineFilter template

Some PipelineFilter templates are built in SuperSocket, which can cover almost 90% of the application scenarios, greatly simplifying the development work, so it is not necessary to implement PipelineFilter completely from scratch Even if these built-in templates can't meet your needs, completely implement PipelineFilter

SuperSocket provides these PipelineFilter templates:

  • TerminatorPipelineFilter (SuperSocket.ProtoBase.TerminatorPipelineFilter, SuperSocket.ProtoBase)
  • TerminatorTextPipelineFilter (SuperSocket.ProtoBase.TerminatorTextPipelineFilter, SuperSocket.ProtoBase)
  • LinePipelineFilter (SuperSocket.ProtoBase.LinePipelineFilter, SuperSocket.ProtoBase)
  • CommandLinePipelineFilter (SuperSocket.ProtoBase.CommandLinePipelineFilter, SuperSocket.ProtoBase)
  • BeginEndMarkPipelineFilter (SuperSocket.ProtoBase.BeginEndMarkPipelineFilter, SuperSocket.ProtoBase)
  • FixedSizePipelineFilter (SuperSocket.ProtoBase.FixedSizePipelineFilter, SuperSocket.ProtoBase)
  • FixedHeaderPipelineFilter (SuperSocket.ProtoBase.FixedHeaderPipelineFilter, SuperSocket.ProtoBase)

4.2 PipelineFilter based on built-in template

4.2.1 FixedHeaderPipelineFilter - Protocol with fixed header format and content length

The first part defines the basic information including the length of the second part. We usually call the first part as the header

For example, we have a protocol that the header contains three bytes. The first byte is used to store the type of request, and the last two bytes are used to represent the length of the request body:

/// +-------+---+-------------------------------+
/// |request| l |                               |
/// | type  | e |    request body               |
/// |  (1)  | n |                               |
/// |       |(2)|                               |
/// +-------+---+-------------------------------+

According to the specification of this agreement, we can use the following code to define the type of package:

public class MyPackage
{
    public byte Key { get; set; }

    public string Body { get; set; }
}

The next step is to design PipelineFilter:

public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
    public MyPipelineFilter()
        : base(3) // The size of the header is 3 bytes, so 3 is passed to the constructor of the base class
    {

    }

    // Returns the size of the packet body from the header of the packet
    protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
    {
        var reader = new SequenceReader<byte>(buffer);
        reader.Advance(1); // skip the first byte
        reader.TryReadBigEndian(out short len);
        return len;
    }

    // Parse the packet into an instance of MyPackage
    protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
    {
        var package = new MyPackage();

        var reader = new SequenceReader<byte>(buffer);

        reader.TryRead(out byte packageKey);
        package.Key = packageKey;            
        reader.Advance(2); // skip the length             
        package.Body = reader.ReadString();

        return package;
    }
}

Finally, you can create a host by the type of packet and the type of PipelineFilter:

var host = SuperSocketHostBuilder.Create<MyPackage, MyPipelineFilter>()
    .UsePackageHandler(async (s, p) =>
    {
        // handle your package over here
    }).Build();

You can also get more flexibility by moving the code that parses the package from PipelineFilter to your package decoder:

public class MyPackageDecoder : IPackageDecoder<MyPackage>
{
    public MyPackage Decode(ref ReadOnlySequence<byte> buffer, object context)
    {
        var package = new MyPackage();

        var reader = new SequenceReader<byte>(buffer);

        reader.TryRead(out byte packageKey);
        package.Key = packageKey;            
        reader.Advance(2); // skip the length             
        package.Body = reader.ReadString();

        return package;
    }
}

Enable it in SuperSocket through the UsePackageDecoder method of host builder:

builder.UsePackageDecoder<MyPackageDecoder>();

4.2.3 another way to mount the parser

In ASP Net core application we can inject new() directly or adopt factory mode to inject the protocol parser into the Host, and then use it in the filter

public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
    public readonly PacketConvert _packageConvert;
    public MyPipelineFilter()
        : base(3) // The size of the header is 3 bytes, so 3 is passed to the constructor of the base class
    {
        _packageConvert = new PackageConvert();
    }

    // Returns the size of the packet body from the header of the packet
    protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
    {
        var reader = new SequenceReader<byte>(buffer);
        reader.Advance(1); // skip the first byte
        reader.TryReadBigEndian(out short len);
        return len;
    }

    // Parse the packet into an instance of MyPackage
    protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
    {
        var package = _packageConvert.Deserialize<Package>(buffer);

        return package;
    }
}

PS: the buffer returned by the DecodePackage in the filter may not be a complete package. For example, in a package with a fixed header and tail structure, the returned buffer may be in the format of removing the header and tail

For example, for a fixed header and tail packet 0x7E 0x7E XXXXXX 0x7E 0x7E, the header and tail 0x7E 0x7E in the returned buffer will be removed, leaving only the middle XXXXXX part, so attention should be paid to when implementing the decoder part

7. Extend AppSession and SuperSocketService

7.1 extending AppSession

In SuperSocket's management of Socket, SessionContainer provides a Session instance for everyone to get in the program, and it only needs to be called in the build. Usemiddleware < inprocsessioncontainermiddleware > () and UseInProcSessionContainer() can be accessed through appsession Server. Sessioncontainer() get

However, in order to facilitate management, it is better to implement another SessionManager for personal roles, which can be more easily integrated into our ASP Net core application Using concurrent dictionary Atomic Dictionary to store can avoid some deadlock problems in reading and writing

public class SessionManager<TSession> where TSession : IAppSession
{
    /// <summary>
    ///Stored Session
    /// </summary>
    public ConcurrentDictionary<string, TSession> Sessions { get; private set; } = new();

    /// <summary>
    ///Number of sessions
    /// </summary>
    public int Count => Sessions.Count;

    /// <summary>
    /// </summary>
    public SessionManager()
    {
    }

    public ConcurrentDictionary<string, TSession> GetAllSessions()
    {
        return Sessions;
    }

    /// <summary>
    ///Get a Session
    /// </summary>
    /// <param name="key"> </param>
    /// <returns> </returns>
    public virtual async Task<TSession> TryGet(string key)
    {
        return await Task.Run(() =>
        {
            Sessions.TryGetValue(key, out var session);
            return session;
        });
    }

    /// <summary>
    ///Add or update a Session
    /// </summary>
    /// <param name="key">     </param>
    /// <param name="session"> </param>
    /// <returns> </returns>
    public virtual async Task TryAddOrUpdate(string key, TSession session)
    {
        await Task.Run(() =>
        {
            if (Sessions.TryGetValue(key, out var oldSession))
            {
                Sessions.TryUpdate(key, session, oldSession);
            }
            else
            {
                Sessions.TryAdd(key, session);
            }
        });
    }

    /// <summary>
    ///Remove a Session
    /// </summary>
    /// <param name="key"> </param>
    /// <returns> </returns>
    public virtual async Task TryRemove(string key)
    {
        await Task.Run(() =>
        {
            if (Sessions.TryRemove(key, out var session))
            {
            }
            else
            {
            }
        });
    }

    /// <summary>
    ///Remove Session through Session
    /// </summary>
    /// <param name="sessionId"> </param>
    /// <returns> </returns>
    public virtual async Task TryRemoveBySessionId(string sessionId)
    {
        await Task.Run(() =>
        {
            foreach (var session in Sessions)
            {
                if (session.Value.SessionID == sessionId)
                {
                    Sessions.TryRemove(session);
                    return;
                }
            }
        });
    }

    /// <summary>
    ///Delete zombie link
    /// </summary>
    /// <returns> </returns>
    [Obsolete("This method is discarded", true)]
    public virtual async Task TryRemoveZombieSessions()
    {
        await Task.Run(() =>
        {
        });
    }

    /// <summary>
    ///Remove all sessions
    /// </summary>
    /// <returns> </returns>
    public virtual async Task TryRemoveAll()
    {
        await Task.Run(() =>
        {
            Sessions.Clear();
        });
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <param name="buffer">  </param>
    /// <returns> </returns>
    public virtual async Task SendAsync(TSession session, ReadOnlyMemory<byte> buffer)
    {
        if (session == null)
        {
            throw new ArgumentNullException(nameof(session));
        }
        await session.SendAsync(buffer);
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <param name="message"> </param>
    /// <returns> </returns>
    public virtual async Task SendAsync(ClientSession session, string message)
    {
        if (session == null)
        {
            throw new ArgumentNullException(nameof(session));
        }
        // ReSharper disable once PossibleNullReferenceException
        await session.SendAsync(message);
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <returns> </returns>
    public virtual async Task<Guid> FindIdBySession(TSession session)
    {
        return await Task.Run(() =>
        {
            return Guid.Parse(Sessions.First(x => x.Value.SessionID.Equals(session.SessionID)).Key);
        });
    }
}

7.2 how to implement SuperSocketService by yourself?

We need to use SuperSocket in program CS, which will lead to a problem. In this way, our SuperSocket service will become difficult to control. Is there a way to extract this part of the code?

The answer is yes, we can use it The BackgroundService or IHostedService in Net Core implements the background services, and even manages these services, which can be created, started and stopped at any time as needed In addition, we can obtain dependency injection services at any time to do more operations, such as reading configuration, managing Session, configuring codec, log, transponder, MQ, etc

public class TcpSocketServerHostedService : IHostedService
{
    private readonly IOptions<ServerOption> _serverOptions;
    private readonly IOptions<KafkaOption> _kafkaOptions;
    private readonly ClientSessionManagers _clientSessionManager;
    private readonly TerminalSessionManager _gpsTrackerSessionManager;
    private readonly ILogger<TcpSocketServerHostedService> _logger;
    private readonly IGeneralRepository _generalRepository;
    private readonly NbazhGpsSerializer _nbazhGpsSerializer = new NbazhGpsSerializer();

    private static EV26MsgIdProducer _provider = null;

    /// <summary>
    ///Tcp Server service
    /// </summary>
    /// <param name="serverOptions">            </param>
    /// <param name="kafkaOptions">             </param>
    /// <param name="clientSessionManager">     </param>
    /// <param name="gpsTrackerSessionManager"> </param>
    /// <param name="logger">                   </param>
    /// <param name="factory">                  </param>
    public TcpSocketServerHostedService(
        IOptions<ServerOption> serverOptions,
        IOptions<KafkaOption> kafkaOptions,
        ClientSessionManagers clientSessionManager,
        TerminalSessionManager gpsTrackerSessionManager,
        ILogger<TcpSocketServerHostedService> logger,
        IServiceScopeFactory factory)
    {
        _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
        _kafkaOptions = kafkaOptions;
        _clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
        _gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var host = SuperSocketHostBuilder.Create<NbazhGpsPackage, ProtocolPipelineSwitcher>()
            .ConfigureSuperSocket(opts =>
            {
                foreach (var listener in _serverOptions.Value.TcpListeners)
                {
                    opts.AddListener(new ListenOptions()
                    {
                        Ip = listener.Ip,
                        Port = listener.Port
                    });
                }
            })
            .UseSession<GpsTrackerSession>()
            .UseClearIdleSession()
            .UseSessionHandler(onClosed: async (s, v) =>
                {
                    try
                    {
                        // Manage Session
                        await _gpsTrackerSessionManager.TryRemoveBySessionId(s.SessionID);
                    }
                    catch
                    {
                        // ignored
                    }
                })
            .UsePackageHandler(async (s, packet) =>
            {
                // Processing package
            })
            .UseInProcSessionContainer()
            .BuildAsServer();

        await host.StartAsync();

        await Task.CompletedTask;
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        try
        {
            await _gpsTrackerSessionManager.TryRemoveAll();
        }
        catch
        {
            // ignored
        }

        await Task.CompletedTask;
    }
}

After implementing the service, we can write extension methods to inject the service

/// <summary>
///Server extension
/// </summary>
public static class ServerBuilderExtensions
{
    /// <summary>
    ///Add Tcp server
    /// </summary>
    /// <param name="services"> </param>
    /// <returns> </returns>
    public static IServiceCollection AddTcpServer(
        this IServiceCollection services)
    {
        services.AddSingleton<TerminalSessionManager>();
        services.AddHostedService<TcpSocketServerHostedService>();
        return services;
    }

    /// <summary>
    ///Add Ws server
    /// </summary>
    /// <param name="services"> </param>
    /// <returns> </returns>
    public static IServiceCollection AddWsServer(
        this IServiceCollection services)
    {
        services.AddSingleton<ClientSessionManagers>();
        services.AddHostedService<WebSocketServerHostedService>();
        return services;
    }
}

8. Expand the functions of SuperSocket

8.1 multi protocol switching

Sometimes we are faced with a requirement that the same interface needs to receive protocol packets from different terminals. In this way, we usually distinguish protocols according to different points of the protocol PS: the protocol should be of the same type and have obviously different characteristics!

The specific implementation method is to implement a special PipelineFilter. In the following code, we will read the first byte of the packet data to distinguish whether the protocol starts with 0x78 0x78 or 0x79 0x79, then move the tag back to the beginning of the packet, and then give the packet data to the corresponding filter for parsing:

// NbazhGpsPackage: package codec
public class ProtocolPipelineSwitcher : PipelineFilterBase<NbazhGpsPackage>
{
    private IPipelineFilter<NbazhGpsPackage> _filter7878;
    private byte _beginMarkA = 0x78;

    private IPipelineFilter<NbazhGpsPackage> _filter7979;
    private byte _beginMarkB = 0x79;

    public ProtocolPipelineSwitcher()
    {
        _filter7878 = new EV26PipelineFilter7878(this);
        _filter7979 = new EV26PipelineFilter7979(this);
    }

    public override NbazhGpsPackage Filter(ref SequenceReader<byte> reader)
    {
        if (!reader.TryRead(out byte flag))
        {
            throw new ProtocolException(@"flag byte cannot be read");
        }

        if (flag == _beginMarkA)
        {
            NextFilter = _filter7878;
        }
        else if (flag == _beginMarkB)
        {
            NextFilter = _filter7979;
        }
        else
        {
            return null;
            //throw new ProtocolException($"unknown first byte {flag}");
        }

        // Move tag back to beginning
        reader.Rewind(1);
        return null;
    }
}

9 build WebSocket server

The implementation method of WebSocket Server is roughly the same as that of the previous Socket Server. The main differences are: WebSocket Server does not need to configure codec, and uses String as message format

/// <summary>
/// </summary>
public class WebSocketServerHostedService : IHostedService
{
    private readonly IOptions<ServerOption> _serverOptions;
    private readonly ClientSessionManagers _clientSessionManager;
    private readonly TerminalSessionManager _gpsTrackerSessionManager;
    private readonly IGeneralRepository _generalRepository;

    /// <summary>
    /// </summary>
    /// <param name="serverOptions">            </param>
    /// <param name="clientSessionManager">     </param>
    /// <param name="gpsTrackerSessionManager"> </param>
    /// <param name="factory">                  </param>
    public WebSocketServerHostedService(
        IOptions<ServerOption> serverOptions,
        ClientSessionManagers clientSessionManager,
        TerminalSessionManager gpsTrackerSessionManager,
        IServiceScopeFactory factory)
    {
        _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(_serverOptions));
        _clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
        _gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
        _generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
    }

    /// <summary>
    /// WebSocketServer
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var host = WebSocketHostBuilder.Create()
            .ConfigureSuperSocket(opts =>
            {
                foreach (var listener in _serverOptions.Value.WsListeners)
                {
                    opts.AddListener(new ListenOptions()
                    {
                        Ip = listener.Ip,
                        Port = listener.Port
                    });
                }
            })
            .UseSession<ClientSession>()
            .UseClearIdleSession()
            .UseSessionHandler(onClosed: async (s, v) =>
            {
                await _clientSessionManager.TryRemoveBySessionId(s.SessionID);
            })
            .UseWebSocketMessageHandler(async (s, p) =>
            {
                var package = p.Message.ToObject<ClientPackage>();

                if (package.PackageType == PackageType.Heart)
                {
                    
                    return;
                }

                if (package.PackageType == PackageType.Login)
                {
                    var client = _generalRepository.FindAsync<User>(x => x.Id.Equals(Guid.Parse(package.ClientId)));

                    if (client is null)
                    {
                        await s.CloseAsync(CloseReason.ProtocolError, "ClientId non-existent");
                    }

                    var verifyCode = Guid.NewGuid().ToString();
                    var loginPacket = new ClientPackage()
                    {
                        PackageType = PackageType.Login,
                        ClientId = package.ClientId,
                        VerifyCode = verifyCode,
                    };
                    s["VerifyCode"] = verifyCode;

                    var msg = loginPacket.ToJson();
                    await s.SendAsync(msg);
                }

                // track
                if (package.PackageType == PackageType.Trace)
                {
                    return;
                }
            })
            .UseInProcSessionContainer()
            .BuildAsServer();

        await host.StartAsync();

        await Task.CompletedTask;
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await Task.CompletedTask;
    }
}

9.1 external: WebSocket parameter transmission

The first request of WebSocket is to establish a link based on Http, so WebSocket can carry parameters such as token in the URL or request body Of course, the back end is not as simple as when writing Api. It needs to intercept the URL or information of the current request, read it, and then verify it This part of the code will be added later

//. Net Core reads parameters from Url

10 multi server and collaboration between different services

Thanks to the SessionManager we have implemented, we will not inject DI into the SessionManger of the Server. After that, we can do cross Service messaging, verification and other operations in the services of any Server

Codec development preview of More 1 protocol

Example of protocol codec:

EV26 Gps communication protocol (use method in xUnit test):

  1. Github
  2. Gitee

Simple example:

public class Nbazh0X01Test
{
    private readonly ITestOutputHelper _testOutputHelper;
    private NbazhGpsSerializer NbazhGpsSerializer = new NbazhGpsSerializer();

    public Nbazh0X01Test(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public void Test1()
    {
        //78 78 11 01 07 52 53 36 78 90 02 42 70 00 32 01 00 05 12 79 0D 0A

        var hex = "7878 11 01 07 52 53 36 78 90 02 42 7000 3201 0005 1279 0D0A".ToHexBytes();

        // ----Protocol analysis part----//
        var packet = NbazhGpsSerializer.Deserialize(hex);
        Nbazh0X01 body = (Nbazh0X01)packet.Bodies;
        // ----Protocol analysis part----//

        Assert.Equal(0x11, packet.Header.Length);
        Assert.Equal(0x01, packet.Header.MsgId);

        Assert.Equal("7 52 53 36 78 90 02 42".Replace(" ", ""), body.TerminalId);
        Assert.Equal(0x7000, body.TerminalType);
        //Assert.Equal(0x3201, body.TimeZoneLanguage.Serialize());

        Assert.Equal(0x0005, packet.Header.MsgNum);
        Assert.Equal(0x1279, packet.Header.Crc);

        // Time zone 0011 001000000001
    }
}

More 2 DotNetty

Later, we will explore the similarities and differences between DotNetty and SuperSocket

Keywords: C# socket TCPIP netcore

Added by friedemann_bach on Sat, 26 Feb 2022 04:07:53 +0200