Implementing cloud side collaboration using NATS message middleware

1. Introduction to NATs

NATS(Message bus): from the general frame composition of cloud foundry, a component called NATS is located in the center of each module. NATS is an open source, lightweight, high-performance distributed message queuing system developed by Derek, the architect of cloud foundry, which supports publish and subscribe mechanisms. Its core is developed based on EventMachine. The amount of code is small. You can download it and study it slowly. Its core principle is based on message publishing and subscription mechanism. Each module on each server will publish multiple message topics to MessageBus according to its own message category; At the same time, it also subscribes to the module that needs to interact with it according to the message subject of the required information content. NATS was originally written in Ruby and can realize 150k messages per second. Later, it was rewritten in Go language and can reach 8-11 million messages per second. The whole program is very small, only 3M Docker image. It does not support persistent messages. If you are offline, you can't get messages.

NATS is suitable for the message communication system of cloud infrastructure, IoT device message communication and micro service architecture. Apcera team is responsible for maintaining NATS server (developed in Golang language) and client (including Go, Python, Ruby, Node.js, Elixir, Java, Nginx, C and c#). The open source community has also contributed some client libraries, including those in Rust, PHP, Lua and other languages. At present, companies that have adopted NATS system include Ericsson, HTC, Baidu, Siemens and VMware.

Message communication systems with similar functions to Nats are common in the market:
ActiveMQ (written in Java), KafKa (written in Scala), RabbitMq (written in Erlang), Nats (previously written in Ruby and now modified to Go), Redis (written in C language), Kestrel (written in Scala is not commonly used), NSQ (written in Go language), Comparison of these message communication systems in terms of Broker throughput: (Note: from the comparison of different versions of message systems by Derek Collinson)

According to its official website, NATS is an open-source, high-performance, concise and flexible central system suitable for modern reliable and flexible cloud and distributed systems. In fact, it is a distributed message queuing system that supports the PubSub/ReqRsp model. It was originally developed under the leadership of Apcera and implemented the Ruby version of server and client. Derek Collinson, the main author, claims to have worked in MQ for more than 20 years and experienced TIBOC, Rendezvous and EMC. Here is his own reddit answer.
According to the log of Ruby NATS in github, Derek implemented the Ruby version of NATS server and corresponding client in 2011. At the end of December, Derek rewritten the server with Golang for 13 years, and finally found that the effect was better. Therefore, the Ruby version of the server is gradually eliminated. Now, only one Golang version of the server is maintained on the official website, that is, gnatsd here.

2.NATS publish / subscribe mechanism

concept

Publish/subscribe (Publish/subscribe or pub/sub) is a message paradigm. The sender (publisher) of a message does not plan to send its message to a specific receiver (subscriber). Instead, published messages are divided into different categories without knowing what kind of subscribers subscribe to. Subscribers express interest in one or more categories, so they only receive messages of interest without knowing what kind of publishers publish messages. This decoupling of publishers and subscribers allows better scalability and more dynamic network topology

Publish / subscribe is a brother of the message queuing paradigm and is usually part of a larger message oriented middleware system. Most messaging systems support both the message queue model and the publish / subscribe model in the application program interface (API), such as Java Message Service (JMS).

In reality, not all requests expect a reply, but do not expect a reply, and naturally there is no state. Have you heard the radio? Have you used the radio? That's what I mean.

The publish / subscribe pattern defines a one to many dependency that allows multiple subscriber objects to listen to a topic object at the same time. When the subject object changes its state, it will notify all subscriber objects so that they can automatically update their state.

Message filtering

In the publish / subscribe model, subscribers typically receive a subset of all published messages. The process of selecting messages to accept and process is called filtering. There are two common forms of filtering: topic based and content-based.

In a topic based system, messages are published to topics or named channels. Subscribers will receive all messages on the topic they subscribe to, and all subscribers subscribing to the same topic will receive the same message. The publisher is responsible for defining the message categories to which subscribers subscribe.

In the content-based system, the subscriber defines the conditions of the message of interest. Only when the properties or content of the message meet the conditions defined by the subscriber, the message will be delivered to the subscriber. Subscribers need to be responsible for classifying messages.

Some systems support a mixture of the two: publishers publish messages to topics, while subscribers register content-based subscriptions to one or more topics.

topology

In many publish / subscribe systems, publishers publish messages to an intermediate message broker, and then subscribers register subscriptions with the message broker, which filters them. Message brokers usually perform store and forward functions to send messages from publishers to subscribers.

Usage scenario

Many projects have message distribution or event notification mechanisms, especially those with high modularity.

For example, in your system, many modules are interested in creating new users. The permission module wants to set default permissions for new users, the report module wants to regenerate the report of the current month, and the e-mail system wants to send activation e-mail to users... Such codes are written behind the business logic of the new user, which will increase the coupling and reduce the maintainability. Moreover, this method is not desirable when each module is an independent system.

For simple cases, The Observer Pattern is sufficient. If there are many places in the system that need to send and receive messages, it is not applicable. Otherwise, it will cause the expansion of the number of classes and increase the complexity of classes. At this time, a more centralized mechanism is needed to deal with these business logic.

characteristic

A subscriber can subscribe to multiple publishers

Messages will reach all subscribers, and subscribers will lose messages they don't need according to the filter (the filter works on the subscriber side)

Each subscriber receives a copy of each message

Based on push, where messages are automatically broadcast to subscribers, they do not need to request or poll topics to obtain new messages. There are many different types of subscribers within the publish / subscribe mode.

Non persistent subscribers are temporary subscription types that receive messages only when actively listening for topics.

Persistent subscribers will receive a copy of each message published, even when they are "offline" when the message is published.

There are also types of dynamic persistent subscribers and managed persistent subscribers.

advantage

Reduce the coupling between modules: publishers and subscribers are loosely coupled, and do not need to know the existence of each other. Related operations are concentrated in Publisher.
Strong scalability: after the system is complex, the message subscription and distribution mechanism can be implemented as a separate module to add new features to meet the requirements

defect

Rather than defects, its design itself has the following characteristics. But in any case, this model is logically unreliable. Mainly reflected in:

The publisher does not know whether the subscriber has received the published message
The subscriber does not know whether he has received all the messages sent by the publisher
The sender cannot know the execution of the subscriber
No one knows when subscribers start receiving messages

Existing cases

A ctiveMQ (written in Java), KafKa (written in Scala), rabbit MQ (written in Ruby), Nats (previously written in ruby, now modified to Go)

3. NATS - message communication model

Message communication model
The message communication of NATS is as follows: the data of the application is encoded into a message and sent out through the publisher; The subscriber receives the message, decodes it, and then processes it. Subscribers can process NATS messages synchronously or asynchronously.

  • Asynchronous processing
    Asynchronous processing uses the callback message handle to process messages. When a message arrives, the registered callback handle receives and controls the processing of messages. In the whole process, the client will not be blocked and can perform other tasks synchronously. Asynchronous processing can adopt the design of multi-threaded scheduling.
  • Synchronous processing
    Synchronization requires the application to display the calling method to process the incoming message. This display call is a blocking call that pauses the task until a message is available. If no messages are available, the period of message processing blocking is set by the client. Synchronization is usually used for the server to wait and process the incoming request message and send the response to the client.

NATS supports the following three message communication models:

1. Publish / subscribe model

The publish / Subscribe Communication Model of NATS is one to many message communication. Publishers send messages on a topic, and any client that registers (subscribes to) this topic can receive messages on this topic. Subscribers can subscribe to topics of interest using topic wildcards.
For subscribers, you can choose to process received messages asynchronously or synchronously. If the message is processed asynchronously, the message handle that the message is delivered to the subscriber. If the client does not have a handle, the message communication is synchronous, and the client may be blocked until it processes the current message.

Quality of service (QoS)
Send at most once (TCP reliability): if the client does not register a topic (or the client is not online), the client will not receive the message when the topic publishes a message. NATS system is a message communication system of "no matter after sending", so if advanced services are required, you can choose "NATS Streaming" or develop corresponding functions on the client
NATs streaming: some usage scenarios require more advanced and stricter transmission guarantees. These applications rely on the underlying message transmission. No matter whether the network is interrupted or the subscriber is online, it is necessary to ensure that the subscriber can receive the message

2. Request / response model

NATS supports two kinds of request response message communication: P2P (peer-to-peer) and O2M (one to many). P2P is the fastest and the first to respond. For O2M, you need to set the limit of the number of responses that the requester can receive (by default, only one response from the subscriber can be received - random)
In the request response mode, the publish request operation publishes a message with the expected response to the Reply topic.
The request creates an inbox, performs the call in the inbox, and responds and returns

Multiple subscribers (reply example) subscribe to the same topic. The requester sends a request to the topic. By default, only one subscriber's response (random) is received

In fact, the "request" or "response" method is not defined in the NATS protocol. It is implemented in a disguised form through SUB/PUB: the requester first creates an inbox through SUB, and then sends a PUB with reply to. After receiving the PUB message, the responder sends a response message to reply to realize the request / response. Both reply to and inbox are a subject, and the former is a subset of the latter (in the case of O2M)

3. Queue model

NATS supports the queue of P2P message communication. To create a message queue, subscribers need to register a queue name. All subscribers use the same queue name to form a queue group. When the message is sent to the subject, the queue group will automatically select a member to receive the message. Although the queue group has multiple subscribers, each message can only be received by one subscriber in the group.
The subscriber of the queue can be asynchronous, which means that the message handle handles the delivered message in a callback manner. Synchronization queue subscribers must establish logic to process messages

NATS supports the queue of P2P message communication. To create a message queue, subscribers need to register a queue name. All subscribers use the same queue name to form a queue group. When the message is sent to the subject, the queue group will automatically select a member to receive the message. Although the queue group has multiple subscribers, each message can only be received by one subscriber in the group.
The subscriber of the queue can be asynchronous, which means that the message handle handles the delivered message in a callback manner. Asynchronous queue subscribers must establish logic to process messages.

The queue model is commonly used for data queues. For example, data collected from web pages is directly written to the queue after processing. The receiving end can start multiple threads to read one queue at the same time. Some data is consumed by one thread and cannot be seen by other threads. This method is to solve the problem of huge collection, The back-end service can dynamically adjust the concurrency number to consume this data. To put it bluntly, the upstream production data is too fast, and the downstream consumption may not be processed. Buffer in the middle, and the downstream can make dynamic adjustment according to the actual situation to achieve dynamic balance.

NATS characteristics

NATS provides the following unique features:
1) pure publish / subscribe
Never assume a receiver
Always online
2) server in cluster mode
NATS servers can be clustered;
Published queues can be clustered across domains;
Cluster aware client
3) automatic pruning of subscribers
To support scalability, NATS provides automatic pruning of client connections;
If a client APP processes messages very slowly, NATS will automatically close the client's connection;
If a client fails to respond within the ping pong interval, the server will automatically close the connection;
The client implements the reconnection logic
4) text based protocol
It is easy to develop;
It does not affect the performance of the server;
You can connect to the server directly with Telnet
5) multiple QoS
Send at most once (TCP level reliability) - NATS immediately sends messages to eligible subscribers without leaving messages
    https://www.zhihu.com/question/49596182
At least once (via NATS Streaming) - if the matching subscriber is not online at the moment, the Message will be stored until it is sent to the subscriber and confirmed by the subscriber. Unless the Message times out or the storage space is exhausted
6) via NATS Streaming
The server maintains the subscription push state of the persistent subscriber, so that the persistent subscriber can know where they were disconnected in the last session
7) Event streaming service (via NATS Streaming)
Messages are persisted and stored in memory, files, or other secondary storage devices based on time stamps, serial numbers, or relative bit differences
8) cache the latest or first value (via NATS Streaming)
After the subscriber connects to the server, push the latest publish message to the subscriber first

4.NATS protocol

reference resources: https://www.cnblogs.com/yorkyang/p/8393080.html

5.NATS usage

This nats cloud collaboration is based on openstack keystone. One server publishes nats client processing and another server subscribes to synchronous data. This demo is only for reference, because the optimization has not been done yet. I hope it can give more help to my friends.

nats client

def dispose_nats(response):
    '''
   response : flask response.

   send request data to nats
   '''
    print response.status_code
    if response.status_code == 200 or response.status_code == 201 or response.status_code == 204:
        response_body = response.get_data()
        response_body = json.loads(response_body)
        request_body = request.get_json()
        if request.method == 'POST':
            if request.url == 'http://10.121.11.163:5000/v3/users/accounts' or request.url == 'http://10.121.11.163:5000/v3/policies':
                pass
            else:
                data_id = {'id': ''}
                for data in response_body.values():
                    data_id['id'] = data.get('id', '')
                if data_id.get('id') == '':
                    LOG.warning('The data ID is obtained when publishing the information')
                for body in request_body.values():
                    body['id'] = data_id.get('id', '')
            datas = {'url': request.url, 'body': request_body,
                     'method': request.method,
                     'headers': {'Content-Type': request.headers['Content-Type'],
                                 'X-Auth-Token': request.headers['X-Auth-Token']}
                     }
            tornado.ioloop.IOLoop.instance().run_sync(partial(send_nats, datas))
        elif request.method == 'PATCH':
            datas = {'url': request.url, 'body': request_body,
                     'method': request.method,
                     'headers': {'Content-Type': request.headers['Content-Type'],
                                 'X-Auth-Token': request.headers['X-Auth-Token']}
                     }
            tornado.ioloop.IOLoop.instance().run_sync(partial(send_nats, datas))

        elif request.method == 'DELETE':
            datas = {'url': request.url, 'headers': {
                'X-Auth-Token': request.headers['X-Auth-Token']},
                     'method': request.method}
            tornado.ioloop.IOLoop.instance().run_sync(partial(send_nats, datas))

    return response


@tornado.gen.coroutine
def send_nats(datas):
	"""
	nats Publish information, datas Is from flask in response The previously spliced data is used as the subscription side for data synchronization, which is specified here
	once nats If the connection address is the default, please pay attention to the change, nats use tornado It is processed asynchronously, which is not used by the author nats client The mechanism of response may be that the subscriber's message timed out and was not processed during the publishing process, resulting in the failure of synchronization. It is recommended that the subscriber send the message after receiving the information nats client Response to prevent data from being out of sync
	"""
    nc = NATS()
    yield nc.connect(servers=["nats://10.121.121.241:4222"], connect_timeout=10)
    yield nc.publish("synchronization", json.dumps(datas).encode())

nats server

The author uses http to send a request to achieve synchronization. Because the nats server will not start by default, the subscription is placed in the docker to run. Some error throwing information is stored in the specified log file using logging, so as to check whether there are synchronization exceptions in the process. Finally, the main function is optimized. If you have any questions, please contact me in the bottom comment or private letter, Make complaints about Tucao!

import datetime
import json
import logging

import requests
import sys
import getopt
import tornado.gen
import tornado.ioloop
from nats.io import Client as NATS

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    filename='sub-nats.log',
                    filemode='a')


@tornado.gen.coroutine
def send():
    nc = NATS()
    ip = 'http://10.121.121.241:5000'
    yield nc.connect("nats://10.121.121.241:4222", connect_timeout=10)

    @tornado.gen.coroutine
    def subscriber(msg):
        data = json.loads(msg.data.decode())
        url = data['url'].split('5000')
        urls = ip + url[1]
        if data['method'] == 'POST':
            try:
                res = requests.post(url=urls, data=json.dumps(data['body']), headers=data['headers'])
                logging.info(res.json())
                logging.info(res.status_code)
            except BaseException as e:
                logging.error(str(e))

        elif data['method'] == 'PATCH':
            try:
                res = requests.post(url=urls, data=json.dumps(data['body']), headers=data['headers'])
                logging.info(res.json())
                logging.info(res.status_code)
            except BaseException as e:
                logging.error(str(e))

        elif data['method'] == 'DELETE':
            try:
                res = requests.delete(url=urls, headers=data['headers'])
                logging.info(res.json())
                logging.info(res.status_code)
            except BaseException as e:
                logging.error(str(e))

    yield nc.flush()
    yield nc.subscribe("synchronization", "", subscriber)
    while 1:
        yield tornado.gen.sleep(100)
    # Sends a PING and wait for a PONG from the server, up to the given timeout.
    # This gives guarantee that the server has processed above message.


class Usage(Exception):
    def __init__(self, msg):
        self.msg = msg


def main(argv=None):
    if argv is None:
        argv = sys.argv
    try:
        try:
            opts, args = getopt.getopt(argv[1:], "h", ["help"])
            tornado.ioloop.IOLoop.instance().run_sync(send)
        except getopt.error, msg:
            raise Usage(msg)
        # more code, unchanged
    except Usage, err:
        print >> sys.stderr, err.msg
        print >> sys.stderr, "for help use --help"
        return 2


if __name__ == '__main__':
    sys.exit(main())

Keywords: Python

Added by ajar on Fri, 14 Jan 2022 05:02:40 +0200