Event message production and consumption Middleware - OSS.DataFlow

The process of system reconfiguration and decoupling involves the separation of services in different fields, or the separation of real-time response part and non response part under the same service. The decomposed parts complete the overall business logic through the flow and transmission of asynchronous messages. However, I feel that the SDK of different message queues is often called directly at the business level, which is not concise enough, Recently, a middleware OSS.Dataflow has been open-source. I hope it can help the students who see it.

OSS.Dataflow mainly implements the process abstraction of asynchronous message delivery, provides a unified abstract interface for message publishing and subscription at the business level, and completes message delivery with simple calls between business logic branches, which is independent of the specific message storage trigger implementation. At the same time, the interface is extracted at the underlying storage and trigger level, which can adapt the specific message infrastructure globally. (on top of these interfaces, the event processor is also implemented to realize the fault-tolerant supplementary mechanism of event execution through repeated delivery of messages. This article will introduce it later. There are examples of source code unit testing.)

1, Message service side usage

  The OSS.Dataflow code can be accessed through the Gitee and GitHub It can be installed directly through Nuget or through the command line: install package oss.dataflow

The use of components is very simple. You only need to focus on:

  1. The message publisher interface, which is returned when the component is registered, is used for business method calls to pass in the message body.
  2. Message subscriber (consumer) interface implementation or delegate method, which is passed in at the time of component registration.

Specific examples:

  1. Example of publish subscribe independent invocation of message
       // Global initialization, injection subscriber implementation
    const string msgPSKey = "Publisher-Subscriber-MsgKey";
    DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) =>
            {
                // Currently, the delegate method of injecting consumption can also be implemented through the interface
                // DoSomething(data);
                return true;
            });

    //    Get publisher interface
    private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); 

    //  Publish messages in business methods
    await publisher.Publish(msgPSKey,new MsgData() {name = "test"});    

    2.   Example of streaming call for message

    // Directly register the consumer implementation and obtain the message publishing interface
    private static readonly IDataPublisher _delegateFlowpusher = 
        DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) =>
            {
                // Currently, the delegate method of injecting consumption can also be implemented through the interface
                // DoSomething(data);
                return true;
            });

    // Publish messages in business methods
    await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});

As mentioned above, you only need to obtain the publisher and inject the consumption implementation to complete the asynchronous consumption processing of the whole message. Multiple consumption implementations can be registered for the same message key. When a message enters the consumption, it will be processed concurrently.

II   Message underlying storage adaptation extension

Earlier, we introduced the use of business interfaces, which are isolated from specific message queues or databases. This is the use of docking business level. Because different business scenarios and different projects have different requirements for message response speed and processing mechanism, OSS.DataFlow also provides an extended interface to connect with message products to facilitate users to adapt to the existing message infrastructure.

  1. Message storage adapter interface

For event message processing, we need to focus on two things: receiving storage and consumption trigger. DataFlowManager message flow management class is provided in the class library. Users can complete the specific storage implementation by implementing the IDataPublisherProvider interface.

At the same time, when different message products trigger consumption (such as database scheduled task or RabbitMQ consumption), call the notification method (NotifySubscriber) to trigger the specific business subscription processing registered through the class library.

    // Message flow core component manager
    public static class DataFlowManager
    {
        /// <summary>
        /// Provider of custom data flow publishing (storage) implementation
        /// </summary>
        public static IDataPublisherProvider PublisherProvider { get; set; }

        /// <summary>
        ///  Notify subscribers through a custom message trigger mechanism
        ///     Please intercept exceptions when calling to prevent dirty data from causing msgData error in type
        /// </summary>
        /// <param name="msgDataKey"></param>
        /// <param name="msgData">When customizing the message content and triggering, please pay attention to the security of consumption data type conversion with registered subscribers</param>
        /// <returns></returns>
        public static Task<bool> NotifySubscriber(string msgDataKey, object msgData)
        {
            ....
        }
    }

About IDataPublisherProvider

   public interface IDataPublisherProvider
    {
        /// <summary>
        /// Data publisher
        /// </summary>
        /// <param name="option"></param>
        /// <returns> Return message publishing interface implementation </returns>
        IDataPublisher CreatePublisher(DataPublisherOption option);
    }

  /// <summary>
    ///  Publisher of data
    /// </summary>
    public interface IDataPublisher
    {
        /// <summary>
        /// Advance data(Store specific message queue or database implementation)
        /// </summary>
        /// <param name="dataKey"></param>
        /// <param name="data"></param>
        /// <returns>Push successfully</returns>
        Task<bool> Publish<TData>(string dataKey,TData data);
    }

You can see that the idatapoublisher interface is responsible for the specific storage implementation, which can be implemented according to the source of the DataPublisherOption_ The name business attribute implementation returns different specific implementations for different business requirements.

  2. Introduction to default implementation

With the help of. Net's own memory message queue, the default internal message storage and forwarding implementation (memory level) is provided in the class library. Users can expand relevant interfaces and make global configuration by themselves.

The built-in. Net Core message queue has one queue by default and the maximum concurrency is 32 threads. If necessary, you can set the source of the DataPublisherOption_ Name, the class library will be for each source_name creates a separate memory queue.

 

If you have seen here and feel OK, you can like it below, or you can pay attention to my company number (see QR code)

 

Keywords: .NET

Added by MikeTyler on Thu, 11 Nov 2021 06:08:58 +0200