Drink, talk and chat, based on vue3 0+Tornado6. 1 + redis PubSub mode to build an asynchronous non blocking (aioredis) real-time (websocket) communication chat system

The original text is reproduced from "Liu Yue's technology blog" https://v3u.cn/a_id_202

"Desire for expression" is a powerful "source power" in the history of human growth. Engels pointed out bluntly that human beings in the ignorant era, that is, the low-level stage, "take fruits, nuts and roots as food; the emergence of language with clear syllables is the main achievement of this period". In the Internet age, people's desire for expression is often easier to be satisfied because of the existence of chat software. Generally speaking, chat is mostly based on two forms: group chat and single chat. Group chat or group chat can be understood as a chat room with a maximum number of people, while single chat can be regarded as a special chat room with a maximum of 2 people.

In order to develop a high-quality chat system, developers should have the basic knowledge of how the client and server communicate. In the chat system, the client can be a mobile application (C end) or a web application (B end). There is no direct communication between clients. Instead, each client is connected to a chat service that supports communication between the two sides. Therefore, the most basic functions that the service must support in business:

1. Be able to receive information from other clients in real time.

2. Be able to push each message to the recipient in real time.

When the client intends to start chat, it will connect to the chat service using one or more network protocols. For the chat service, the choice of network protocol is very important. Here, we choose the interface of the built-in Websocket protocol in the Tornado framework, which is simple and convenient to install Tornado 6 one

pip3 install tornado==6.1

Then write the program startup file main py:

import tornado.httpserver  
import tornado.websocket  
  
import tornado.ioloop  
  
import tornado.web  
  
import redis  
  
import threading  
  
import asyncio  
  
# User list  
users = []  
  
# websocket protocol  
class WB(tornado.websocket.WebSocketHandler):  
  
  
	# Cross domain support  
	def check_origin(self,origin):  
  
		return True  
  
	# Open link  
	def open(self):  
  
                users.append(self)  
  
  
	# receive messages  
	def on_message(self,message):  
  
		self.write_message(message['data'])  
  
	# to break off  
	def on_close(self):  
  
		users.remove(self)

# Create torando instance  
  
app = tornado.web.Application(  
  
	[  
  
	(r'/wb/',WB)  
  
	],debug=True  
  
)  
  
if __name__ == '__main__':  
  
  
	# Claim server  
	http_server_1 = tornado.httpserver.HTTPServer(app)  
  
	# Listening port  
	http_server_1.listen(8000)  
  
	# Open event loop  
	tornado.ioloop.IOLoop.instance().start() 

In this way, a set of websocket protocol services has been built in a short time. Every time a client initiates a websocket connection request, we will add it to the user list and wait for the user to push or receive information.

Next, we need to connect the sender and receiver of the message in some form to achieve the purpose of "chat". Here, we select Redis's publish and subscribe mode (pubsub) and illustrate it with a demo, server py

import redis  
  
r = redis.Redis()  
r.publish("test",'hello')

Then write client py:

import redis  
r = redis.Redis()  
ps = r.pubsub()  
ps.subscribe('test')    
for item in ps.listen():   
    if item['type'] == 'message':  
        print(item['data'])

It can be understood as follows: the subscriber is responsible for subscribing to the channel; The sender (publisher) is responsible for sending binary string messages to the channel, and then push them to the subscriber when the channel receives the message.

Channels can not only contact publishers and subscribers, but also use channels for "message isolation", that is, messages from different channels will only be pushed to users subscribing to the channel:

Rewrite main according to publisher subscriber logic py:

import tornado.httpserver  
import tornado.websocket  
  
import tornado.ioloop  
  
import tornado.web  
  
import redis  
  
import threading  
  
import asyncio  
  
# User list  
users = []  
  
# Channel list  
channels = ["channel_1","channel_2"]  
  
  
# websocket protocol  
class WB(tornado.websocket.WebSocketHandler):  
  
  
	# Cross domain support  
	def check_origin(self,origin):  
  
		return True  
  
	# Open link  
	def open(self):  
  
  
		users.append(self)  
  
  
	# receive messages  
	def on_message(self,message):  
  
		self.write_message(message['data'])  
  
	# to break off  
	def on_close(self):  
  
		users.remove(self)  
  
  
  
  
  
  
# redis based monitoring of publishers' messages  
def redis_listener(loop):  
  
	asyncio.set_event_loop(loop)  
  
	async def listen():   
  
		r = redis.Redis(decode_responses=True)  
  
		# Declare pubsb instance  
		ps = r.pubsub()  
  
		# Subscribe to chat channels  
  
		ps.subscribe(["channel_1","channel_2"])  
  
  
		# Listen for messages  
		for message in ps.listen():  
  
			print(message)  
  
			# Traverse users on links  
			for user in users:  
  
				print(user)  
  
				if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):  
  
  
					user.write_message(message["data"])  
  
	future = asyncio.gather(listen())  
	loop.run_until_complete(future)  
  
  
  
# Interface release information  
class Msg(tornado.web.RequestHandler):  
  
  
	# Override parent method  
	def set_default_headers(self):  
  
		# Set request header information  
		print("Start setting")  
		# Domain name information  
		self.set_header("Access-Control-Allow-Origin","*")  
		# Request information  
		self.set_header("Access-Control-Allow-Headers","x-requested-with")  
		# Request mode  
		self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  
  
	  
  
	# Release information  
	async def post(self):  
  
		data = self.get_argument("data",None)  
  
		channel = self.get_argument("channel","channel_1")  
  
		print(data)  
  
		# release  
		r = redis.Redis()  
  
		r.publish(channel,data)  
  
		return self.write("ok")  
  
  
# Create torando instance  
  
app = tornado.web.Application(  
  
	[  
  
	(r'/send/',Msg),  
	(r'/wb/',WB)  
  
	],debug=True  
  
)  
  
if __name__ == '__main__':  
  
  
	loop = asyncio.new_event_loop()  
  
	# Single thread start subscriber service  
	threading.Thread(target=redis_listener,args=(loop,)).start()  
  
  
	# Claim server  
	http_server_1 = tornado.httpserver.HTTPServer(app)  
  
	# Listening port  
	http_server_1.listen(8000)  
  
	# Open event loop  
	tornado.ioloop.IOLoop.instance().start()

It is assumed that there are two channels by default. The logic is as follows: the front end controls the websocket link, and the user selects to publish the message to that channel. At the same time, each user has the channel attribute through the setting of the front-end cookie. After the user with the channel attribute publishes a message to the channel, All other users with the channel attribute actively push the just published message after subscribing through redis, while the push of the channel only matches the users subscribing to the channel, so as to achieve the purpose of message isolation.

It should be noted that when starting the redis subscription service through a thread, the current loop instance needs to be passed to the collaboration object, otherwise the websocket instance will not be obtained in the subscription method, and this error is reported:

IOLoop.current() doesn't work in non-main

This is because the bottom layer of Tornado is based on event loop ioloop, while Django or Flask in synchronous framework mode does not have this problem.

Let's write the front-end code. Here we use the most popular vue3 0 framework, write chat vue:

<template>  
  <div>  
  
  
            <h1>Chat window</h1>  
  
  
            <van-tabs v-model:active="active" @click="change_channel">  
  
              <van-tab title="Customer service No. 1">  
  
  
                <table>  
                
              <tr v-for="item,index in msglist" :key="index">  
                  
                {{ item }}  
  
              </tr>  
  
            </table>  
                  
  
  
              </van-tab>  
  
  
              <van-tab title="Customer service No. 2">  
                  
  
                <table>  
                
              <tr v-for="item,index in msglist" :key="index">  
                  
                {{ item }}  
  
              </tr>  
  
            </table>  
  
  
              </van-tab>  
  
            </van-tabs>  
  
  
              
  
  
            <van-field label="Chat message" v-model="msg" />  
  
            <van-button color="gray" @click="commit">send out</van-button>  
  
     
  </div>  
</template>  
  
<script>  
  
export default {  
 data() {  
    return {  
      auditlist:[],  
  
      //Chat record  
      msglist:[],  
      msg:"",  
       websock: null, //Established connection  
      lockReconnect: false, //Is the connection really established  
      timeout: 3 * 1000, //One heartbeat in 30 seconds  
      timeoutObj: null, //Outer heartbeat countdown  
      serverTimeoutObj: null, //Inner heartbeat detection  
      timeoutnum: null, //Disconnect and reconnect countdown  
      active:0,  
      channel:"channel_1"  
       
    }  
  },  
  methods:{  
  
  
    //Switch channels  
    change_channel:function(){  
  
  
          if(this.active === 0){  
  
  
                this.channel = "channel_1";  
  
                var name = "channel";  
          var value = "channel_1";  
  
            
  
          }else{  
  
  
              this.channel = "channel_2";  
  
                var name = "channel";  
          var value = "channel_2";  
  
  
          }  
  
  
          //Clear chat  
          this.msglist = [];  
  
  
          var d = new Date();  
          d.setTime(d.getTime() + (24 * 60 * 60 * 1000));  
          var expires = "expires=" + d.toGMTString();  
          document.cookie = name + "=" + value + "; " + expires;  
  
  
          this.reconnect();  
  
  
    },  
     initWebSocket() {  
      //Initialize weoscket  
      const wsuri = "ws://localhost:8000/wb/";  
      this.websock = new WebSocket(wsuri);  
      this.websock.onopen = this.websocketonopen;  
      this.websock.onmessage = this.websocketonmessage;  
      this.websock.onerror = this.websocketonerror;  
      this.websock.onclose = this.websocketclose;  
    },  
  
    reconnect() {  
      //Reconnect  
      var that = this;  
      if (that.lockReconnect) {  
        // Is the connection really established  
        return;  
      }  
      that.lockReconnect = true;  
      //If there is no connection, it will always be reconnected. Set the delay to avoid too many requests  
      that.timeoutnum && clearTimeout(that.timeoutnum);  
      // If there is still a value in the countdown of disconnection and reconnection here, it will be cleared  
      that.timeoutnum = setTimeout(function() {  
        //Then a new connection  
        that.initWebSocket();  
        that.lockReconnect = false;  
      }, 5000);  
    },  
  
     reset() {  
      //Reset heartbeat  
      var that = this;  
      //Clear time (clear both internal and external heartbeat timings)  
      clearTimeout(that.timeoutObj);  
      clearTimeout(that.serverTimeoutObj);  
      //Restart heartbeat  
      that.start();  
    },  
  
    start() {  
      //Start heartbeat  
      var self = this;  
      self.timeoutObj && clearTimeout(self.timeoutObj);  
      // If the outer heartbeat countdown exists, clear it  
      self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);  
      // If the inner heartbeat detection countdown exists, clear it  
      self.timeoutObj = setTimeout(function() {  
        // Reassign and resend for heartbeat detection  
        //A heartbeat is sent here. After receiving it, the backend returns a heartbeat message,  
        if (self.websock.readyState == 1) {  
          //If the connection is normal  
          // self.websock.send("heartCheck");  
        } else {  
          //Otherwise, reconnect  
          self.reconnect();  
        }  
        self.serverTimeoutObj = setTimeout(function() {  
          // In the three second heartbeat detection, if a value does not respond for 3 seconds, turn off the connection  
          //Timeout shutdown  
         // self.websock.close();  
        }, self.timeout);  
      }, self.timeout);  
      // 3s once  
    },  
  
    websocketonopen(e) {  
      //After the connection is established, execute the send method to send data  
      console.log("success");  
  
     // this.websock.send("123");  
      // this.websocketsend(JSON.stringify(actions));  
    },  
    websocketonerror() {  
      //Connection establishment failed reconnection  
      console.log("fail");  
      this.initWebSocket();  
    },  
    websocketonmessage(e) {  
  
      console.log(e);  
      //Data receiving  
      //const redata = JSON.parse(e.data);  
      const redata = e.data;  
  
      //accumulation  
      this.msglist.push(redata);  
  
      console.log(redata);  
  
       
    },  
    websocketsend(Data) {  
      //Data transmission  
      this.websock.send(Data);  
    },  
    websocketclose(e) {  
      //close  
      this.reconnect()  
      console.log("Disconnect", e);  
    },  
  
    //Submit Form   
    commit:function(){  
  
  
        //Send request  
  
        this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{  
  
          console.log(data);  
  
        });  
  
  
  
    },  
    
  
  },  
  
  mounted(){  
  
  
      //Connect back-end websocket service  
      this.initWebSocket();  
  
  
  
      var d = new Date();  
          d.setTime(d.getTime() + (24 * 60 * 60 * 1000));  
          var expires = "expires=" + d.toGMTString();  
          document.cookie = "channel" + "=" + "channel_1" + "; " + expires;  
  
      
  
  }  
  
}  
</script>  
  
  
<style scoped>  
  @import url("../assets/style.css");  
  
  .chatbox{  
  
      color:black;  
  
  }  
  
  .mymsg{  
  
      background-color:green;  
  
  }  
  
  
</style>

Here, the front-end online client regularly sends heartbeat events to the status server. If the server is in a specific time (for example, x seconds) when a heartbeat event is received from the client, it is considered that the user is online. Otherwise, it will be offline. After offline, you can reconnect within the threshold time. At the same time, you can switch channels synchronously by using the tab of vant framework, and write the channel ID into a cookie after switching, so that the back-end service can match and push after identification.

The effect is as follows:

Admittedly, the function has been implemented, but what if we are in a high concurrency scenario? Imagine if 100000 people on a channel are online at the same time and there are 100 new messages per second, then the push frequency of the websocket service of tornado in the background is 100w*10/s = 1000w/s.

If such a system architecture does not do load balancing, it is difficult to resist the pressure, so where is the bottleneck? Yes, it's the database redis. Here we need the help of the asynchronous redis database aioredis:

pip3 install aioredis

aioredis asynchronously operates redis read and write through the cooperative process, which avoids the io blocking problem and makes the publish and subscribe operations of messages non blocking.

At this point, you can create a new asynchronous subscription service file main_with_aioredis.py:

import asyncio  
import aioredis  
from tornado import web, websocket  
from tornado.ioloop import IOLoop  
import tornado.httpserver  
import async_timeout

After that, the main modification logic is to asynchronously establish redis links through aioredis, and asynchronously subscribe to multiple channels, and then through asyncio. Com of the native collaboration create_ The task method (or asyncio.ensure_future) registers the asynchronous task reader for subscription consumption:

async def setup():  
    r = await aioredis.from_url("redis://localhost", decode_responses=True)  
    pubsub = r.pubsub()  
  
    print(pubsub)  
    await pubsub.subscribe("channel_1","channel_2")  
  
    #asyncio.ensure_future(reader(pubsub))  
    asyncio.create_task(reader(pubsub))

In the subscription consumption method, the publication information in the subscribed channel is monitored asynchronously. At the same time, as in the previous synchronization method, the user's channel attributes are compared and pushed by channel:

async def reader(channel: aioredis.client.PubSub):  
    while True:  
        try:  
            async with async_timeout.timeout(1):  
                message = await channel.get_message(ignore_subscribe_messages=True)  
                if message is not None:  
                    print(f"(Reader) Message Received: {message}")  
  
                    for user in users:  
  
                        if user.get_cookie("channel") == message["channel"]:  
  
                            user.write_message(message["data"])  
          
                await asyncio.sleep(0.01)  
        except asyncio.TimeoutError:  
            pass

Finally, the tornado event loop is used to execute the callback method in the IOLoop delivery, and the setup method is added to the event callback:

if __name__ == '__main__':  
  
    # Listening port  
    application.listen(8000)  
  
    loop = IOLoop.current()  
    loop.add_callback(setup)  
    loop.start()

Complete asynchronous message publishing, subscription and push service transformation main_aioredis.py:

import asyncio  
import aioredis  
from tornado import web, websocket  
from tornado.ioloop import IOLoop  
import tornado.httpserver  
import async_timeout  
  
users = []  
  
# websocket protocol  
class WB(tornado.websocket.WebSocketHandler):  
  
  
    # Cross domain support  
    def check_origin(self,origin):  
  
        return True  
  
    # Open link  
    def open(self):  
  
  
        users.append(self)  
  
  
    # receive messages  
    def on_message(self,message):  
  
        self.write_message(message['data'])  
  
    # to break off  
    def on_close(self):  
  
        users.remove(self)  
  
  
class Msg(web.RequestHandler):  
  
  
    # Override parent method  
    def set_default_headers(self):  
  
        # Set request header information  
        print("Start setting")  
        # Domain name information  
        self.set_header("Access-Control-Allow-Origin","*")  
        # Request information  
        self.set_header("Access-Control-Allow-Headers","x-requested-with")  
        # Request mode  
        self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  
  
  
    # Release information  
    async def post(self):  
  
        data = self.get_argument("data",None)  
  
        channel = self.get_argument("channel","channel_1")  
  
        print(data)  
  
        # release  
        r = await aioredis.from_url("redis://localhost", decode_responses=True)  
  
        await r.publish(channel,data)  
  
        return self.write("ok")  
  
  
async def reader(channel: aioredis.client.PubSub):  
    while True:  
        try:  
            async with async_timeout.timeout(1):  
                message = await channel.get_message(ignore_subscribe_messages=True)  
                if message is not None:  
                    print(f"(Reader) Message Received: {message}")  
  
                    for user in users:  
  
                        if user.get_cookie("channel") == message["channel"]:  
  
                            user.write_message(message["data"])  
          
                await asyncio.sleep(0.01)  
        except asyncio.TimeoutError:  
            pass  
  
  
async def setup():  
    r = await aioredis.from_url("redis://localhost", decode_responses=True)  
    pubsub = r.pubsub()  
  
    print(pubsub)  
    await pubsub.subscribe("channel_1","channel_2")  
  
    #asyncio.ensure_future(reader(pubsub))  
    asyncio.create_task(reader(pubsub))  
  
  
application = web.Application([  
    (r'/send/',Msg),  
    (r'/wb/', WB),  
],debug=True)      
  
  
if __name__ == '__main__':  
  
    # Listening port  
    application.listen(8000)  
  
    loop = IOLoop.current()  
    loop.add_callback(setup)  
    loop.start()

From the perspective of programming, it makes full use of the asynchronous execution idea of coprocessing, which is more smooth and smooth.

Conclusion: from the perspective of practical operation, Redis publish and subscribe mode is very suitable for the scenario of this real-time (websocket) communication and chat system. However, if the published message does not have a corresponding channel or consumer, the message will be discarded. If we suddenly cut off the network while consuming in the production environment, one of the subscribers will hang up for a period of time, When it is reconnected, the messages generated during this period will not exist. Therefore, if you want to ensure the robustness of the system, you need other services to design a highly available real-time storage scheme, but that's another story. Finally, give the project address to share with the villagers: https://github.com/zcxey2911/tornado_redis_vue3_chatroom

The original text is reproduced from "Liu Yue's technology blog" https://v3u.cn/a_id_202

Keywords: Python Redis websocket Network Communications Tornado

Added by Mzor on Wed, 22 Dec 2021 13:02:40 +0200