Third party components of the flash framework

Article directory

Third party components of the flash framework

flask-session

Third party session, which is stored in the local database.
You need to import the original session. The principle is to replace the open session function in the original session, which is equivalent to rewriting the content session

from flask import Flask,session
from flask_session import Session
from redis import Redis

app = Flask(__name__)
app.secret_key = "helloworld"

Session(app)
app.config["SESSION_TYPE"]="redis"  #Set database type
app.config["SESSION_REDIS"] = Redis(host="127.0.0.1",port=6379,db=1)  #Set database properties
@app.route('/')
def hello_world():
    if not session.get("IS_LOGINED"):
        return "No access"
    return 'Hello World!'
    
class Login(views.MethodView):
    def get(self):
        return render_template("login.html")

    def post(self):
        user = request.form.get("username")
        passwd = request.form.get("password")
        print(user)
        print(passwd)
        if user == "alex" and passwd == "123":
            session["IS_LOGINED"] = "True"
            return "Login successfully"
        else:
            msg = "Wrong user name or password"
            return render_template("login.html",msg=msg)
app.add_url_rule("/login",endpoint="login",view_func=Login.as_view(name="login"))

Use the same method as the native session
The session stored in the cookie is no longer data, but uuid
In redis, the data format is
key: prefix + uuid
Value: session key value pair
redis database is usually stored in intranet server and data is stored in memory

WTForms

Equivalent to modelsForm, similar to ORM
register_forms.py

from wtforms.fields import simple,core
from wtforms import Form
from wtforms import validators

class RegisterForm(Form):
    username = simple.StringField(
        label="User name",
        validators=[
            validators.DataRequired(message="Data cannot be empty"),
            validators.Length(min=4,max=16,message="More than 4 bits less than 16 bits")
        ],
        render_kw={"class":"form-control"}  # When generating tags, add a class
    )
    password = simple.PasswordField(
        label="Password",
        validators = [
            validators.DataRequired(message="Password cannot be empty"),
            validators.Length(min=4,max=16,message="More than 4 bits less than 16 bits"),
            validators.Regexp(regex="\w+",message="Password must be a number,Letters, underline")
        ],
        render_kw={"class":"form-control"}
    )

    repassword = simple.PasswordField(
        label="Enter password again",
        validators = [
            validators.DataRequired(message="Password cannot be empty"),
            validators.Length(min=4,max=16,message="More than 4 bits less than 16 bits"),
            validators.Regexp(regex="\w+",message="Password must be a number,Letters, underline"),
            validators.equal_to("password",message="Two passwords are inconsistent")
        ],
        render_kw={"class":"form-control"}
    )

    gender = core.RadioField(
        label="Gender",
        choices=(
            (1,"male"),
            (2,"female"),
        ),
        coerce=int,  # When you get data, you get the previous number
        default=1    # Default selection
    )
  1. Class needs to inherit Form
  2. Simple for simple tag and core for complex tag
  3. validators is used to verify the string

Verification method

from register_forms import RegisterForm

class Register(views.MethodView):
    def get(self):
        regfm = RegisterForm()    # instantiation
        return render_template("register.html",fm=regfm)

    def post(self):
        regfm = RegisterForm(request.form)
        print(request.form)
        if not regfm.validate():  # Check string
             return render_template("register.html",fm=regfm)
        #If the verification is successful, write to the database
        mh = mysqlHandler()
        student_id = mh.get_student_id()+1
        student_info = [student_id,]
        for k,value in request.form.items():
            if k == "repassword":
                continue
            student_info.append(value)
        print(student_info)
        mh.write(student_info)
        return "ok"
app.add_url_rule("/register",endpoint="register",view_func=Register.as_view(name="register"))

register.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>register</title>
     <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</head>
<body>
<div class="container">
    <h1>register</h1>
    <form action="" method="post">
        {% for field in fm  %}
            <div class="form-group">
                {{ field.label }} {{ field }} {{ field.errors.0 }}
            </div>
        {% endfor %}
        <button type="submit" class="btn btn-primary">Sign in</button>
    </form>
</div>
</body>
</html>

{{field.label}} displays the label
{{field}} displays the corresponding label
{{field.errors.0}} if there is an error message, the error message is displayed

DBUtils database connection pool

After the database creates a link, keep a certain number of links unchanged to save link time
sql_help.py

import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
    creator=pymysql,   #Module using database
    maxconnections=6,  #maximum connection
    mincached=2,       # At initialization, the least free links in the connection pool, 0 means not to create
    maxcached=5,       #The most idle links in the connection pool, 0 and None represent unlimited
    maxshared=3,       #The maximum number of shared links in the connection pool. 0 and None represent all shares. They are useless because they are all shares no matter how they are set
    blocking=True,    #Whether to block waiting for true or false if no link is available in the connection pool
    maxusage=None,    #The maximum number of reuses of a link, None means unlimited
    setsession=[],    #List of commands executed before starting a session
    ping=0,
    #ping mysql server to check whether the service is available
    #0 never Ping 1 create database link 2 create cursor 4 execute 7 any time
    host="127.0.0.1",
    port=3306,
    user="root",
    password='123',
    database='students_infov2',
    charset='utf8'
)

if __name__ == "__main__":
    conn = POOL.connection()  # Take the link out of the connection pool
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    # DictCursor lexicalizes the query
    print("Successful connection")
    res = cursor.execute("select * from student_info")
    print(cursor.fetchall())
    cursor.close()
    conn.close()  # Put connections back into the connection pool

websocket

websocket is also used for instant messaging (IM)
Gevent websocket needs to be installed
Polling: keep asking the server, and the server keeps responding, wasting front-end, back-end, bandwidth, and ensuring real-time data
Long polling:
Method 1: the client sends messages to the server, the server polls, places the data in another place, and the client goes to another place to get it
Method 2: server polling, put it in another place, directly push it to the client, advantages, release client resources, release client resources, service pressure is inevitable, save bandwidth resources, data can not be real-time
Method 3: open a continuous connection between the client and the server, push messages directly to the past, completely solve the real-time problem, solve the problem of bandwidth occupation, and solve the resource problem. Both the client and the server need to start multithreading
By default, flask uses werkzug to process requests (app.run())
Websocket uses WSGI to process requests (after websocket processing, the wsgi.websocket object is added to environ, and then processed by app)

Group chat

web_socket.py

from flask import Flask,request,render_template
from geventwebsocket.websocket import WebSocket
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
import json
app = Flask(__name__)


@app.route('/')
def hello_world():
    return render_template("index.html")

user_socket_list = []
@app.route("/ws/<username>")
def ws(username):  
# Get the websocket socket in the request parameter
    user_socket = request.environ.get('wsgi.websocket') # type:WebSocket
    print(user_socket)
    if user_socket:
        user_socket_list.append(user_socket)
    else:
        # No connection established
        return "No connection established"
    print("user_list",user_socket_list)
    while 1:
        try:
            # If the current user is closed, return to None to terminate the connection
            msg = user_socket.receive()  # Accept user information
            for socket in user_socket_list:
            # If the send connection fails, the link fails, and the link is deleted in the link list
                if socket != user_socket:
                    #Package into json format, send
                    msg_json = json.dumps({
                        "user":username,
                        "msg":msg
                    })
                    socket.send(msg_json)  # Send user information
        except:

            user_socket_list.remove(user_socket)
            return "Connection closure"

if __name__ == '__main__':
    http_serv = WSGIServer(("127.0.0.1",5000),app,handler_class=WebSocketHandler)
    http_serv.serve_forever()
    # app.run()

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>websocket</title>
    <style>
        .send_window{
            border:2px solid black;
            width:500px;
            height:1000px;
        }
    </style>
</head>
<body>
<h1>websocket test</h1>
<p>
    <input type="text" id="username">
    <button id="user_btn">Confirm name</button>
</p>
<p>
    <input type="text" id="send_text">
    <button id="send_btn">Send out</button>
</p>

<div class="send_window">

</div>
</body>
<script type="text/javascript">
    var user_btn = document.getElementById("user_btn");
    var send_window = document.getElementsByClassName("send_window")[0];
    var username = document.getElementById("username");
    var ws = null;
    user_btn.onclick = function () {
    {#   Agreement header must be ws://  Using ws to establish protocol
        ws = new WebSocket("ws://127.0.0.1:5000/ws/"+username.value);
        {#    Front-end acceptance json Character string{user:'alex',msg:'123'}#}
        ws.onmessage = function (data) {
            data = JSON.parse(data.data);
            send_window.innerHTML += "<p>"+ data.user + ":" + data.msg+"</p>"
        };
    };

    var send_btn = document.getElementById("send_btn");
    send_btn.onclick = function () {
        var send_text = document.getElementById("send_text");
        ws.send(send_text.value);
        send_window.innerHTML += "<p style='text-align:right'>"+  send_text.value + ":" + username.value +"</p>";
        send_text.value = "";
    }

</script>
</html>

Front end
ws = new websocket("ws connection address")
"ws.send(" ") send message"
ws.onmessage = func accept message
back-end
reqeust.environ.get('wsgi.websocket ') get socket
socket.send() send
socket.receive accept

Single chat

web_socket.py

from flask import Flask,request,render_template
from geventwebsocket.websocket import WebSocket
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
import json
app = Flask(__name__)


@app.route('/')
def hello_world():
    return render_template("index2.html")

user_socket_list = {}
@app.route("/ws/<username>")
def ws(username):
    user_socket = request.environ.get('wsgi.websocket') # type:WebSocket
    print(user_socket)
    if user_socket:
        user_socket_list[username] = user_socket
    else:
        # No connection established
        return "No connection established"
    print("user_list",user_socket_list)
    while 1:
        try:
            # If the current user is closed, return to None to terminate the connection
            recv_data = user_socket.receive()
            recv = json.loads(recv_data)
            print(recv)
            to_sender = recv.get("to_sender")
            msg = recv.get("msg")
            to_send_sock = user_socket_list.get(to_sender)
            #Package into json format, send
            msg_json = json.dumps({
                "user":username,
                "msg":msg
            })
            to_send_sock.send(msg_json)
        except:
            user_socket_list.pop(username)
            return "Connection closure"


if __name__ == '__main__':
    http_serv = WSGIServer(("127.0.0.1",5000),app,handler_class=WebSocketHandler)
    http_serv.serve_forever()
    # app.run()

index.py

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>A single chat with a nickname</title>
    <style>
        .send_window{
            border:2px solid black;
            width:500px;
            height:1000px;
        }
    </style>
</head>
<body>
<h1>websocket test</h1>
<p>
    <input type="text" id="username">
    <button id="user_btn">Confirm name</button>
</p>
<p>Receiver<input type="text" id="to_send"></p>
<p>
    <input type="text" id="send_text">
    <button id="send_btn">Send out</button>
</p>

<div class="send_window">

</div>
</body>
<script type="text/javascript">
    var user_btn = document.getElementById("user_btn");
    var send_window = document.getElementsByClassName("send_window")[0];
    var username = document.getElementById("username");
    var ws = null;
    user_btn.onclick = function () {
        ws = new WebSocket("ws://127.0.0.1:5000/ws/"+username.value);
        {#    Front-end acceptance json Character string{user:'alex',msg:'123'}#}
        ws.onmessage = function (data) {
            data = JSON.parse(data.data);
            send_window.innerHTML += "<p>"+ data.user + ":" + data.msg+"</p>"
        };
    };
    var send_btn = document.getElementById("send_btn");
    send_btn.onclick = function () {
        var to_send = document.getElementById("to_send");
        var send_text = document.getElementById("send_text");
        var to_json = {
            "to_sender":to_send.value,
            "msg":send_text.value
        };
        ws.send(JSON.stringify(to_json));
        send_window.innerHTML += "<p style='text-align:right'>"+  send_text.value + ":" + username.value +"</p>";
        send_text.value = "";
    }
</script>
</html>
79 original articles published, 55 praised, 40000 visitors+
Private letter follow

Keywords: Session JSON Database socket

Added by janet287 on Sun, 16 Feb 2020 08:09:41 +0200