Article directory
Third party components of the flash framework
flask-session
Third party session, which is stored in the local database.
You need to import the original session. The principle is to replace the open session function in the original session, which is equivalent to rewriting the content session
from flask import Flask,session from flask_session import Session from redis import Redis app = Flask(__name__) app.secret_key = "helloworld" Session(app) app.config["SESSION_TYPE"]="redis" #Set database type app.config["SESSION_REDIS"] = Redis(host="127.0.0.1",port=6379,db=1) #Set database properties @app.route('/') def hello_world(): if not session.get("IS_LOGINED"): return "No access" return 'Hello World!' class Login(views.MethodView): def get(self): return render_template("login.html") def post(self): user = request.form.get("username") passwd = request.form.get("password") print(user) print(passwd) if user == "alex" and passwd == "123": session["IS_LOGINED"] = "True" return "Login successfully" else: msg = "Wrong user name or password" return render_template("login.html",msg=msg) app.add_url_rule("/login",endpoint="login",view_func=Login.as_view(name="login"))
Use the same method as the native session
The session stored in the cookie is no longer data, but uuid
In redis, the data format is
key: prefix + uuid
Value: session key value pair
redis database is usually stored in intranet server and data is stored in memory
WTForms
Equivalent to modelsForm, similar to ORM
register_forms.py
from wtforms.fields import simple,core from wtforms import Form from wtforms import validators class RegisterForm(Form): username = simple.StringField( label="User name", validators=[ validators.DataRequired(message="Data cannot be empty"), validators.Length(min=4,max=16,message="More than 4 bits less than 16 bits") ], render_kw={"class":"form-control"} # When generating tags, add a class ) password = simple.PasswordField( label="Password", validators = [ validators.DataRequired(message="Password cannot be empty"), validators.Length(min=4,max=16,message="More than 4 bits less than 16 bits"), validators.Regexp(regex="\w+",message="Password must be a number,Letters, underline") ], render_kw={"class":"form-control"} ) repassword = simple.PasswordField( label="Enter password again", validators = [ validators.DataRequired(message="Password cannot be empty"), validators.Length(min=4,max=16,message="More than 4 bits less than 16 bits"), validators.Regexp(regex="\w+",message="Password must be a number,Letters, underline"), validators.equal_to("password",message="Two passwords are inconsistent") ], render_kw={"class":"form-control"} ) gender = core.RadioField( label="Gender", choices=( (1,"male"), (2,"female"), ), coerce=int, # When you get data, you get the previous number default=1 # Default selection )
- Class needs to inherit Form
- Simple for simple tag and core for complex tag
- validators is used to verify the string
Verification method
from register_forms import RegisterForm class Register(views.MethodView): def get(self): regfm = RegisterForm() # instantiation return render_template("register.html",fm=regfm) def post(self): regfm = RegisterForm(request.form) print(request.form) if not regfm.validate(): # Check string return render_template("register.html",fm=regfm) #If the verification is successful, write to the database mh = mysqlHandler() student_id = mh.get_student_id()+1 student_info = [student_id,] for k,value in request.form.items(): if k == "repassword": continue student_info.append(value) print(student_info) mh.write(student_info) return "ok" app.add_url_rule("/register",endpoint="register",view_func=Register.as_view(name="register"))
register.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>register</title> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous"> </head> <body> <div class="container"> <h1>register</h1> <form action="" method="post"> {% for field in fm %} <div class="form-group"> {{ field.label }} {{ field }} {{ field.errors.0 }} </div> {% endfor %} <button type="submit" class="btn btn-primary">Sign in</button> </form> </div> </body> </html>
{{field.label}} displays the label
{{field}} displays the corresponding label
{{field.errors.0}} if there is an error message, the error message is displayed
DBUtils database connection pool
After the database creates a link, keep a certain number of links unchanged to save link time
sql_help.py
import pymysql from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, #Module using database maxconnections=6, #maximum connection mincached=2, # At initialization, the least free links in the connection pool, 0 means not to create maxcached=5, #The most idle links in the connection pool, 0 and None represent unlimited maxshared=3, #The maximum number of shared links in the connection pool. 0 and None represent all shares. They are useless because they are all shares no matter how they are set blocking=True, #Whether to block waiting for true or false if no link is available in the connection pool maxusage=None, #The maximum number of reuses of a link, None means unlimited setsession=[], #List of commands executed before starting a session ping=0, #ping mysql server to check whether the service is available #0 never Ping 1 create database link 2 create cursor 4 execute 7 any time host="127.0.0.1", port=3306, user="root", password='123', database='students_infov2', charset='utf8' ) if __name__ == "__main__": conn = POOL.connection() # Take the link out of the connection pool cursor = conn.cursor(pymysql.cursors.DictCursor) # DictCursor lexicalizes the query print("Successful connection") res = cursor.execute("select * from student_info") print(cursor.fetchall()) cursor.close() conn.close() # Put connections back into the connection pool
websocket
websocket is also used for instant messaging (IM)
Gevent websocket needs to be installed
Polling: keep asking the server, and the server keeps responding, wasting front-end, back-end, bandwidth, and ensuring real-time data
Long polling:
Method 1: the client sends messages to the server, the server polls, places the data in another place, and the client goes to another place to get it
Method 2: server polling, put it in another place, directly push it to the client, advantages, release client resources, release client resources, service pressure is inevitable, save bandwidth resources, data can not be real-time
Method 3: open a continuous connection between the client and the server, push messages directly to the past, completely solve the real-time problem, solve the problem of bandwidth occupation, and solve the resource problem. Both the client and the server need to start multithreading
By default, flask uses werkzug to process requests (app.run())
Websocket uses WSGI to process requests (after websocket processing, the wsgi.websocket object is added to environ, and then processed by app)
Group chat
web_socket.py
from flask import Flask,request,render_template from geventwebsocket.websocket import WebSocket from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler import json app = Flask(__name__) @app.route('/') def hello_world(): return render_template("index.html") user_socket_list = [] @app.route("/ws/<username>") def ws(username): # Get the websocket socket in the request parameter user_socket = request.environ.get('wsgi.websocket') # type:WebSocket print(user_socket) if user_socket: user_socket_list.append(user_socket) else: # No connection established return "No connection established" print("user_list",user_socket_list) while 1: try: # If the current user is closed, return to None to terminate the connection msg = user_socket.receive() # Accept user information for socket in user_socket_list: # If the send connection fails, the link fails, and the link is deleted in the link list if socket != user_socket: #Package into json format, send msg_json = json.dumps({ "user":username, "msg":msg }) socket.send(msg_json) # Send user information except: user_socket_list.remove(user_socket) return "Connection closure" if __name__ == '__main__': http_serv = WSGIServer(("127.0.0.1",5000),app,handler_class=WebSocketHandler) http_serv.serve_forever() # app.run()
index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>websocket</title> <style> .send_window{ border:2px solid black; width:500px; height:1000px; } </style> </head> <body> <h1>websocket test</h1> <p> <input type="text" id="username"> <button id="user_btn">Confirm name</button> </p> <p> <input type="text" id="send_text"> <button id="send_btn">Send out</button> </p> <div class="send_window"> </div> </body> <script type="text/javascript"> var user_btn = document.getElementById("user_btn"); var send_window = document.getElementsByClassName("send_window")[0]; var username = document.getElementById("username"); var ws = null; user_btn.onclick = function () { {# Agreement header must be ws:// Using ws to establish protocol ws = new WebSocket("ws://127.0.0.1:5000/ws/"+username.value); {# Front-end acceptance json Character string{user:'alex',msg:'123'}#} ws.onmessage = function (data) { data = JSON.parse(data.data); send_window.innerHTML += "<p>"+ data.user + ":" + data.msg+"</p>" }; }; var send_btn = document.getElementById("send_btn"); send_btn.onclick = function () { var send_text = document.getElementById("send_text"); ws.send(send_text.value); send_window.innerHTML += "<p style='text-align:right'>"+ send_text.value + ":" + username.value +"</p>"; send_text.value = ""; } </script> </html>
Front end
ws = new websocket("ws connection address")
"ws.send(" ") send message"
ws.onmessage = func accept message
back-end
reqeust.environ.get('wsgi.websocket ') get socket
socket.send() send
socket.receive accept
Single chat
web_socket.py
from flask import Flask,request,render_template from geventwebsocket.websocket import WebSocket from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler import json app = Flask(__name__) @app.route('/') def hello_world(): return render_template("index2.html") user_socket_list = {} @app.route("/ws/<username>") def ws(username): user_socket = request.environ.get('wsgi.websocket') # type:WebSocket print(user_socket) if user_socket: user_socket_list[username] = user_socket else: # No connection established return "No connection established" print("user_list",user_socket_list) while 1: try: # If the current user is closed, return to None to terminate the connection recv_data = user_socket.receive() recv = json.loads(recv_data) print(recv) to_sender = recv.get("to_sender") msg = recv.get("msg") to_send_sock = user_socket_list.get(to_sender) #Package into json format, send msg_json = json.dumps({ "user":username, "msg":msg }) to_send_sock.send(msg_json) except: user_socket_list.pop(username) return "Connection closure" if __name__ == '__main__': http_serv = WSGIServer(("127.0.0.1",5000),app,handler_class=WebSocketHandler) http_serv.serve_forever() # app.run()
index.py
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>A single chat with a nickname</title> <style> .send_window{ border:2px solid black; width:500px; height:1000px; } </style> </head> <body> <h1>websocket test</h1> <p> <input type="text" id="username"> <button id="user_btn">Confirm name</button> </p> <p>Receiver<input type="text" id="to_send"></p> <p> <input type="text" id="send_text"> <button id="send_btn">Send out</button> </p> <div class="send_window"> </div> </body> <script type="text/javascript"> var user_btn = document.getElementById("user_btn"); var send_window = document.getElementsByClassName("send_window")[0]; var username = document.getElementById("username"); var ws = null; user_btn.onclick = function () { ws = new WebSocket("ws://127.0.0.1:5000/ws/"+username.value); {# Front-end acceptance json Character string{user:'alex',msg:'123'}#} ws.onmessage = function (data) { data = JSON.parse(data.data); send_window.innerHTML += "<p>"+ data.user + ":" + data.msg+"</p>" }; }; var send_btn = document.getElementById("send_btn"); send_btn.onclick = function () { var to_send = document.getElementById("to_send"); var send_text = document.getElementById("send_text"); var to_json = { "to_sender":to_send.value, "msg":send_text.value }; ws.send(JSON.stringify(to_json)); send_window.innerHTML += "<p style='text-align:right'>"+ send_text.value + ":" + username.value +"</p>"; send_text.value = ""; } </script> </html>