1. Introduction to apscheduler
APScheduler is a python library that allows you to schedule Python code to be executed later. It is a set of task scheduling framework that can be used as a scheduled task controller to add and delete tasks. If jobs are stored in the database, they will also continue to run and maintain their state after the scheduler is restarted. When the scheduler is restarted, it will run all jobs it should run while offline.
😁 Let me give you a template for writing scheduling api first
- Step 1: write your own function def fuction()
- Step 2: create a flash route and define a function def job to be scheduled under the route_ Function(): put the previous def function() function into it
- Step 3: create a task scheduling function, def task(), where you need to configure parameters such as task scheduling method
- Step 4: run the task(), which is mainly written outside the main function
- Step 5: the main function runs the api interface, app.run()
from flask import Flask from flask_apscheduler import APScheduler import datetime def fuction() print("showmaker") app = Flask(__name__) @app.route("/test", methods=['GET','POST']) def job_function(): fuction() def task(): scheduler = APScheduler() scheduler.init_app(app) # Format of scheduled tasks scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id') scheduler.start() # Written in main, IIS will not run task() if __name__ == "__main__": app.run(debug=False, port="5005")
2.APScheduler scheduling method
APScheduler has three ways to schedule tasks:
Our purpose is to run the interface process at the terminal through the api interface and let it schedule our program by itself. The program here is also the task.
- You can select the scheduled task, start and end time
- Perform tasks based on a time interval and interval
- One time task execution
3.APScheduler component
Now that you know the scheduling methods of our tasks, let's take a look at the components of APScheduler:
- triggers: task trigger component, which provides task trigger mode
- job stores: the task store component, which provides a way to save tasks
- executors: task scheduling component, which provides task scheduling methods
- schedulers: task scheduling component, which provides task working mode
3.1 triggers
triggers: three task trigger modes are supported
date:
Fixed date trigger. The task runs only once and is cleared automatically after running; If the specified run time is missed, the task will not be created
parameter | explain |
---|---|
run_date | Task run date |
timezone | time zone |
scheduler.add_job(func=job_function, trigger='date',run_date='2021-10-27 13:00:00', id='my_job_id')
interval:
Time interval trigger, which is executed once at a certain time interval.
parameter | explain |
---|---|
weeks (int) | A few weeks apart |
days (int) | Every few days |
hours (int) | A few hours apart |
minutes (int) | Every few minutes |
seconds (int) | How many seconds apart |
start_date (datetime or str) | Start date |
end_date (datetime or str) | End date |
scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id')
cron
cron style task trigger. This is not commonly used, so I won't introduce it. I'm lazy
3.2 job stores: four job stores are supported
- Memory: the default configuration task exists in memory
- mongdb: support document database storage
- sqlalchemy: support relational database storage
- redis: supports key value storage in the database
Here, if your database is mysql, I recommend using sqlalchemy as the database. I used create_engine to connect to the database to realize database operations, such as deleting data records and writing data records. Here is my own example
from flask import Flask from flask_apscheduler import APScheduler import datetime import pandas as pd from sqlalchemy import create_engine from sqlalchemy import MetaData,Table def delete(): engine = create_engine('mysql+pymysql://User name: password @ip: port number / database? charset=gbk') meta = MetaData(bind=engine) tb_1 = Table('Data table name', meta, autoload=True, autoload_with=engine) tb_2 = Table('Data table name', meta, autoload=True, autoload_with=engine) dlt_1=tb_1.delete() dlt_2=tb_2.delete() conn = engine.connect() # Perform the delete operation conn.execute(dlt_1) conn.execute(dlt_2) conn.close() def write(t): r1=pd.DataFrame() r3=pd.DataFrame() engine = create_engine('mysql+pymysql://User name: password @ip: port number / database? charset=gbk') r1.to_sql('Data table name', con=engine, if_exists='append', index=False) r3.to_sql('Data table name', con=engine, if_exists='append', index=False) def write_log(buf): print(buf) with open('test.txt', 'a') as f: f.write(buf + "\n") app = Flask(__name__) @app.route("/test", methods=['GET','POST']) def job_function(): now_time=datetime.datetime.now() t=now_time.strftime('%Y-%m-%d') write_log("update time:"+t) delete() write_log("delete data") write(t) write_log("write data") def task(): scheduler = APScheduler() scheduler.init_app(app) # Scheduled tasks are executed once every 10s scheduler.add_job(func=job_function, trigger='interval',seconds=60, id='my_job_id') scheduler.start() # Written in main, IIS will not run task() if __name__ == "__main__": app.run(debug=False, port="8888")
Finally, you can check your call time in test.txt
3.3 schedulers:
There are three kinds of schedulers: one runs independently, one runs in the background, and the last one is used with other programs
- BlockingScheduler: used when this scheduler is the only thing to run in your application
- BackgroundScheduler: used when other frameworks are not running, and makes your tasks run in the background
- Asyncoscheduler: used when your program is an asynchronous IO model
- GeventScheduler: used with gevent framework
- Tornado scheduler: used with tornado framework
- Twisted scheduler: used with twisted framework
- QtScheduler: used when developing qt applications