Python asynchronous framework -- Sanic

Python asynchronous framework - Sanic

brief introduction

Sanic is a python 3.7 + Web server and Web framework (sanic is not just a framework, it is also a Web server) designed to improve performance. It allows the use of async/await syntax added in Python 3.5, which makes your code effectively avoid blocking, so as to improve the response speed.

Sanic (including Vibora, which claims to be several times faster than other frameworks and more than twice faster than competitor sanic) is somewhat similar to flash, but different.

start

1 new project

The project name is sanic_pro

Create a new python package in this directory:

config: configuration system file
server: service
utils: Other tools

2 project configuration

In the asynchronous framework, there are many asynchronous orm. In sanic, there are sanicdb, etc. sanicdb uses native SQL statements. In this column, tortoise ORM similar to Django ORM framework is used. For details, see https://tortoise.github.io/

pip install sanic
pip install tortoise-orm[aiomysql]

Create a new settings.py file in the config directory, which is the main configuration file path of the project

**Note: the database used in this column is MariaDB, which is a version derived from MySQL. Please refer to the official website for details**

from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent

# ============================== ProjectConfig start ==============================
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = ''
TITLE = 'recharge_platform'
# =============================== ProjectConfig end ===============================

# ================================== MySQL start ==================================
default_user = ''  # Database user
default_host = ''  # Database ip
default_port = 3306   # Database port
default_password = ''   # Database password
default_database = ''   # database
TORTOISE_ORM = {
    'connections': {
        'default': {
            'engine': 'tortoise.backends.mysql',
            'credentials': {
                'host': default_host,
                'port': default_port,
                'user': default_user,
                'password': default_password,
                'database': default_database,
            }
        },
    },
    'apps': {
        'default': {
            'models': [
                'aerich.models',
            ],
            'default_connection': 'default',
        },
    },
    'use_tz': False,
    'timezone': 'Asia/Shanghai'
}
# ================================== MySQL end ====================================

# =============================== RedisConfig start ===============================
REDIS_HOST = ''
REDIS_PORT = 6379
REDIS_PASSWD = ''
REDIS_SYS_DB = 1
REDIS_NAME = ''
REDIS = {
    'host': REDIS_HOST,
    'port': REDIS_PORT,
    'password': REDIS_PASSWD,
    'db': REDIS_SYS_DB
}
# ================================ RedisConfig end ================================

Here, we use aerich to manage the database

pip install aerich

Create the migration directory and initialize the database link

aerich init -t config.settings.TORTOISE_ORM

After executing the above command, a migrations directory and an aero.ini file will be generated in the same directory as config, which is used to store the database migration history

3. Create a new service

Create a new base in the config directory_ model.py

from tortoise import models, fields


class DBBaseModel(models.Model):
    class Meta:
        abstract = True

    id = fields.IntField(pk=True, allows_generated=True)
    create_time = fields.DatetimeField(auto_now_add=True, null=True, description='Creation time')
    update_time = fields.DatetimeField(auto_now=True, null=True, description='Update time')

Create a python package in the server directory and name it app_system, and create models.py in this directory

from tortoise import fields
from config.base_model import DBBaseModel


class Zoning(DBBaseModel):
    class Meta:
        table = 'sys_zoning'
        table_description = 'administrative division'

    code = fields.CharField(max_length=64, null=False, description='Administrative division code')
    name = fields.CharField(max_length=64, null=False, description='Name of administrative division')
    short = fields.CharField(max_length=32, null=True, default=None, description='Administrative abbreviation')
    level = fields.IntField(null=False, default=-1, description='Grade')
    area = fields.DecimalField(max_digits=16, decimal_places=4, default=0, description='the measure of area')
    longitude = fields.DecimalField(max_digits=16, decimal_places=13, default=0, description='longitude')
    latitude = fields.DecimalField(max_digits=16, decimal_places=13, default=0, description='latitude')
    coordinates = fields.TextField(null=True, description='Coordinate set')
    serial = fields.IntField(null=True, default=None, description='Serial number')
    z_list = fields.TextField(null=True, description='Regional hierarchy')
    is_active = fields.SmallIntField(default=1, description='state')
    parent = fields.ForeignKeyField('default.Zoning',
                                    null=True,
                                    on_delete=fields.SET_NULL,
                                    description='Parent node')

    
class Projects(DBBaseModel):
    class Meta:
        table = 'sys_projects'
        table_description = 'Project information'

    name = fields.CharField(max_length=128, null=False, description='name')
    app_key = fields.CharField(max_length=64, unique=True, null=False, description='project key')
    describe = fields.TextField(null=True, description='Project introduction')
    image = fields.TextField(null=True, description='Project picture')
    is_active = fields.SmallIntField(default=1, description='state')
    zoning = fields.ForeignKeyField('default.Zoning',
                                    null=True,
                                    on_delete=fields.SET_NULL,
                                    description='administrative division')

class Account(DBBaseModel):
    class Meta:
        table = 'sys_account'
        table_description = 'User information'

    username = fields.CharField(max_length=64, unique=True, null=False, description='user name')
    password = fields.CharField(max_length=256, null=False, description='password')
    telephone = fields.CharField(max_length=64, unique=True, null=False, description='cell-phone number')
    real_name = fields.CharField(max_length=64, null=False, description='User name')
    is_superuser = fields.SmallIntField(default=1, description='Is it super tube')
    is_active = fields.SmallIntField(default=1, description='state')
    email = fields.CharField(max_length=64, null=True, default=None, description='mailbox')
    address = fields.CharField(max_length=256, null=True, default=None, description='Contact address')
    longitude = fields.DecimalField(max_digits=16, decimal_places=13, default=0, description='longitude')
    latitude = fields.DecimalField(max_digits=16, decimal_places=13, default=0, description='latitude')
    z_list = fields.TextField(null=True, default=None, description='Regional hierarchy')
    project = fields.ForeignKeyField('default.Projects',
                                     null=True,
                                     on_delete=fields.SET_NULL,
                                     description='Project')
    zoning = fields.ForeignKeyField('default.Zoning',
                                    null=True,
                                    on_delete=fields.SET_NULL,
                                    description='administrative division')
...

Modify settings.py file

.....
'apps': {
        'default': {
            'models': [
                'aerich.models',
                'server.app_system.models',
            ],
            'default_connection': 'default',
        },
    },
....

The first migration uses the following command for database initialization

aerich init-db

After executing this command, the default directory will be generated in migrations, which is the default database configuration migration version file. If multiple databases are configured, the corresponding directory will be created. Now the corresponding data table has been generated in the database.

If the data table is modified, use the following command to migrate the database

# Generate migration file
aerich migrate --name drop_column
# Execute migration file
aerich upgrade

Create new views under system_ area.py

from sanic import Blueprint
from server.app_system.models import Zoning
from utils.helper import return_result

area_router = Blueprint('administrative division', url_prefix='/area')


@area_router.get('/list')
async def area_list(request):
    parent_id = request.args.get('parent_id')
    code = request.args.get('code')
    name = request.args.get('name')
    page_size = int(request.args.get('page_size', '10'))
    page_number = int(request.args.get('page_number', '1'))
    offset = (page_number - 1) * page_size

    zoning_info = Zoning.filter()

    if parent_id:
        zoning_info = zoning_info.filter(id=parent_id)
    if code:
        zoning_info = zoning_info.filter(code__contains=code)
    if name:
        zoning_info = zoning_info.filter(name__contains=name)

    res = await zoning_info.select_related(
        'parent__name', 'parent__code'
    ).offset(offset).limit(page_size).values()

    count = await zoning_info.select_related('parent__name', 'parent__code').count()
    return return_result(data=res, count=count)

__init__.py

from sanic import Blueprint

from .views_area import area_router

auth_blue_group = Blueprint.group(auth_router)

sys_blue_group = Blueprint.group(
    area_router,
    url_prefix='/system'
)

During data serialization, some data types do not support serialization, so some modified data needs to be processed

Create helper.py file under utils

import json as public_json

from decimal import Decimal
from datetime import datetime, date
from sanic.response import json as sanic_json


# Process data that cannot be serialized
class DateEncoder(public_json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime("%Y-%m-%d %H:%M:%S")
        elif isinstance(obj, date):
            return obj.strftime("%Y-%m-%d")
        elif isinstance(obj, Decimal):
            return str(Decimal(obj))
        else:
            return public_json.JSONEncoder.default(self, obj)


def data_json_str(data):
    if not isinstance(data, str):
        return public_json.dumps(data, cls=DateEncoder)
    else:
        return data


def data_json_dict(data):
    if isinstance(data, str):
        return public_json.loads(data)
    else:
        return data


def return_result(code: int = 200, msg: str = 'OK', data=None, count: int = 0, access_token: str = '', *args, **kwargs):
    if data is None:
        data = {}
    return sanic_json({
        'code': code,
        'msg': msg,
        'data': data_json_str(data),
        'count': count,
        'args': args,
        'access_token': access_token,
        'kwargs': kwargs
    })

In the server directory__ init__.py file add the following content

from config.settings import TORTOISE_ORM
from sanic import Sanic
from server.app_system import auth_blue_group, sys_blue_group

from tortoise.contrib.sanic import register_tortoise


app = Sanic(name='recharge_platform')

app.blueprint(sys_blue_group, url_prefix='/api')


# database
register_tortoise(
    app=app,
    config=TORTOISE_ORM,
    generate_schemas=False
)

Create the manager.py file in the project root directory

from server import app

if __name__ == '__main__':
    app.run(port=1000, auto_reload=True, debug=False)

Run the manager.py file


In FastAPI, there is a complete api document. Sanic uses sanic_openapi can also be implemented

pip install sanic_openapi

Modify serve__ init__. Py file

from config.settings import TORTOISE_ORM

from sanic_openapi import swagger_blueprint, openapi3_blueprint
from sanic import Sanic
from server.app_system import auth_blue_group, sys_blue_group

from tortoise.contrib.sanic import register_tortoise


app = Sanic(name='recharge_platform')

# Interface documentation
# http://127.0.0.1:1000/swagger
app.blueprint(swagger_blueprint)
# app.blueprint(openapi3_blueprint)

app.blueprint(auth_blue_group)
app.blueprint(sys_blue_group, url_prefix='/api')


# database
register_tortoise(
    app=app,
    config=TORTOISE_ORM,
    generate_schemas=False
)

Modify app_system.views_area.py file

Note: note the parameter type here

@area_router.get('/list')
@doc.summary('Get list of administrative divisions')
@doc.consumes({'page_number': '1'}, {'page_size': '10'}, location='query', required=True, )
@doc.consumes({'parent_id': int}, {'name': str}, {'code': str}, location='query')
async def area_list(request):
    parent_id = request.args.get('parent_id')
    code = request.args.get('code')
    name = request.args.get('name')
    page_size = int(request.args.get('page_size', '10'))
    page_number = int(request.args.get('page_number', '1'))
    offset = (page_number - 1) * page_size

    zoning_info = Zoning.filter()

    if parent_id:
        zoning_info = zoning_info.filter(id=parent_id)
    if code:
        zoning_info = zoning_info.filter(code__contains=code)
    if name:
        zoning_info = zoning_info.filter(name__contains=name)

    res = await zoning_info.select_related(
        'parent__name', 'parent__code'
    ).offset(offset).limit(page_size).values()

    count = await zoning_info.select_related('parent__name', 'parent__code').count()
    return return_result(data=res, count=count)



4 authentication

Sanic has many methods for interface authentication (login authentication). See for details https://github.com/mekicha/awesome-sanic/blob/master/README.md#authentication

Create a new account under config_ Conf.py file

import hashlib
import jwt
from config.settings import SECRET_KEY, TITLE
from config.database_pool.redis_pool import RedisPool
from functools import wraps
from utils.helper import return_result


# Generate password
def make_password(password):
    pass_str = password + hashlib.new('sha256', SECRET_KEY.encode()).hexdigest()
    encryption = hashlib.new('md5', pass_str.encode()).hexdigest()
    return encryption


# Verify password
def check_password(login_pass, user_pass):
    encryption = make_password(login_pass)
    if encryption == user_pass:
        return True
    else:
        return False


# Generate token
def create_token(user_name, user_id):
    payload = {
        'sub': user_id,
        'user_name': user_name
    }
    herder = {
        'key': SECRET_KEY,
        'title': TITLE,
    }
    token = jwt.encode(
        payload=payload,
        key=SECRET_KEY,
        headers=herder,
        algorithm='HS256'
    )
    return token


# Validate token
def check_token(token, user_id):
    if not user_id or not token:
        return False
    try:
        payload_data = jwt.decode(
            jwt=token,
            key=SECRET_KEY,
            algorithms=['HS256']
        )
        if payload_data.get('sub') != user_id:
            return False
        else:
            return True
    except jwt.exceptions.InvalidTokenError:
        return False


# Interface authentication
def protected(wrapped):
    def decorator(func):
        @wraps(func)
        async def decorated_function(request, *args, **kwargs):
            try:
                token = request.ctx.token
                user_id = request.ctx.user.get('id')
                is_authenticated = check_token(user_id=user_id, token=token)
                if is_authenticated:
                    _redis = RedisPool(db=0)
                    user_token = await _redis.get_redis_info(_key=str(user_id))
                    if token == user_token:
                        response = await func(request, *args, **kwargs)
                        return response
                    else:
                        return return_result(code=401, msg='User login has expired, please login again')
                else:
                    return return_result(code=401, msg='User login has expired, please login again')
            except Exception as e:
                return return_result(code=401, msg='The user is not logged in')

        return decorated_function

    return decorator(wrapped)


if __name__ == '__main__':
    _token = create_token('user', 1)
    data = jwt.decode(
        jwt=_token,
        key=SECRET_KEY,
        algorithms=['HS256']
    )
    print(data, type(data))

Modify app_system.views_area.py file

@area_router.get('/list')
@doc.summary('Get list of administrative divisions')
@doc.consumes({'page_number': '1'}, {'page_size': '10'}, location='query', required=True, )
@doc.consumes({'parent_id': int}, {'name': str}, {'code': str}, location='query')
@protected
async def area_list(request):
    parent_id = request.args.get('parent_id')
    code = request.args.get('code')
    name = request.args.get('name')
    page_size = int(request.args.get('page_size', '10'))
    page_number = int(request.args.get('page_number', '1'))
    offset = (page_number - 1) * page_size

    zoning_info = Zoning.filter()

    if parent_id:
        zoning_info = zoning_info.filter(id=parent_id)
    if code:
        zoning_info = zoning_info.filter(code__contains=code)
    if name:
        zoning_info = zoning_info.filter(name__contains=name)

    res = await zoning_info.select_related(
        'parent__name', 'parent__code'
    ).offset(offset).limit(page_size).values()

    count = await zoning_info.select_related('parent__name', 'parent__code').count()
    return return_result(data=res, count=count)

Keywords: Python

Added by netman182 on Tue, 30 Nov 2021 11:57:15 +0200