Home > Article > Backend Development > Detailed analysis of Python's implementation of scheduled tasks apscheduler
This article brings you relevant knowledge about Python, which mainly introduces related issues about implementing scheduled tasks. You can use third-party packages to manage scheduled tasks. Relatively speaking, apscheduler uses It's simpler to use. Let's take a look at the method of use. I hope it will be helpful to everyone.
[Related recommendations: Python3 video tutorial]
Let’s take a simple example See how apscheduler is used.
#encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, 测试apscheduler'.format(now)) task = BlockingScheduler() task.add_job(func=sch_test, trigger='cron', second='*/10') task.start()
The above example is very simple. We first need to define an apscheduler object, then add_job to add the task, and finally start the task.
The example is to run the sch_test task every 10 seconds. The running results are as follows:
时间:2022-10-08 15:16:30, 测试apscheduler 时间:2022-10-08 15:16:40, 测试apscheduler 时间:2022-10-08 15:16:50, 测试apscheduler 时间:2022-10-08 15:17:00, 测试apscheduler
If we want to carry parameters when executing the task function, just add args to the add_job function, such as task .add_job(func=sch_test, args=('a'), trigger='cron', second='*/10').
In the above example, we have a preliminary understanding of how to use apschedulerl. Next, we need to know the design framework of apscheduler. apscheduler has four main modules: triggers, job_stores, executors, and schedulers.
1. Triggers:
Trigger refers to the triggering method specified by the task. In the example we use "cron" Way. We can choose one of cron, date, and interval.
Cron represents a scheduled task, similar to linux crontab, which is triggered at a specified time.
The available parameters are as follows:
In addition, we can also use expression types to set cron. For example, commonly used ones are:
Usage example, executed once every day at 7:20:
task.add_job(func=sch_test, args=( 'Timed task',), trigger='cron',
hour='7', minute='20')
date represents a one-time task specific to a certain time;
Usage example:
# 使用run_date指定运行时间 task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30)) # 或者用next_run_time task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
interval represents a cyclic task, specifies an interval, and executes it every time the interval passes.
interval can set the following parameters:
# Usage example, execute the sch_test task every 3 seconds:
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
Come on The example uses all three triggers:
# encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) task = BlockingScheduler() task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3)) task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3) task.start()
Print part of the results:
时间:2022-10-08 15:45:49, 一次性任务测试apscheduler 时间:2022-10-08 15:45:49, 循环任务测试apscheduler 时间:2022-10-08 15:45:50, 定时任务测试apscheduler 时间:2022-10-08 15:45:52, 循环任务测试apscheduler 时间:2022-10-08 15:45:55, 定时任务测试apscheduler 时间:2022-10-08 15:45:55, 循环任务测试apscheduler 时间:2022-10-08 15:45:58, 循环任务测试apscheduler
Through code examples and result display, we can clearly know the difference in the use of different triggers.
2. Task storage job_stores
As the name suggests, task storage is where tasks are stored, and they are stored in memory by default. We can also customize the storage method, such as saving tasks in mysql. There are several options here:
Usually the default is to store it in the memory, but if the program fails and restarts, the task will be pulled and run again. If the execution requirements are high, you can choose other memories.
Example of using SQLAlchemyJobStore storage:
from apscheduler.schedulers.blocking import BlockingScheduler def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') sched.start()
3. Executor executors
The function of the executor is to put the task into Run in thread pool or process pool. There are several options:
#The default is ThreadPoolExecutor, and the commonly used ones are thread and process pool executors. If the application is a CPU-intensive operation, ProcessPoolExecutor can be used to execute it.
4. Schedulers schedulers
The scheduler belongs to the core of apscheduler. It plays the role of coordinating the entire apscheduler system, including memory and executor. , the trigger runs normally under its schedule. There are several schedulers:
#Not in specific scenarios, the most commonly used scheduler is BlockingScheduler.
Exception monitoring
When the scheduled task is running, if an error occurs, a monitoring mechanism needs to be set up. We usually use the logging module to record error information.
Usage example:
from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging # logging日志配置打印格式及保存位置 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='sche.log', filemode='a') def log_listen(event): if event.exception : print ( '任务出错,报错信息:{}'.format(event.exception)) else: print ( '任务正常运行...' ) def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 配置任务执行完成及错误时的监听 sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 sched._logger = logging sched.start()
apscheduler encapsulation usage
上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging import logging.handlers import os import datetime class LoggerUtils(): def init_logger(self, logger_name): # 日志格式 formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') log_obj = logging.getLogger(logger_name) log_obj.setLevel(logging.INFO) # 设置log存储位置 path = '/data/logs/' filename = '{}{}.log'.format(path, logger_name) if not os.path.exists(path): os.makedirs(path) # 设置日志按照时间分割 timeHandler = logging.handlers.TimedRotatingFileHandler( filename, when='D', # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周 interval=1, # 多少天切割一次 backupCount=10 # 保留几天 ) timeHandler.setLevel(logging.INFO) timeHandler.setFormatter(formatter) log_obj.addHandler(timeHandler) return log_obj class Scheduler(LoggerUtils): def __init__(self): # 执行器设置 executors = { 'default': ThreadPoolExecutor(10), # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10 'processpool': ProcessPoolExecutor(5) # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5 } self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors) # 存储器设置 # 这里使用sqlalchemy存储器,将任务存储在mysql sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' self.scheduler.add_jobstore('sqlalchemy',url=sql_url) def log_listen(event): if event.exception: # 日志记录 self.scheduler._logger.error(event.traceback) # 配置任务执行完成及错误时的监听 self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 self.scheduler._logger = self.init_logger('sche_test') def add_job(self, *args, **kwargs): """添加任务""" self.scheduler.add_job(*args, **kwargs) def start(self): """开启任务""" self.scheduler.start() # 测试任务 def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) # 添加任务,开启任务 sched = Scheduler() # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 开启任务 sched.start()
【相关推荐:Python3视频教程 】
The above is the detailed content of Detailed analysis of Python's implementation of scheduled tasks apscheduler. For more information, please follow other related articles on the PHP Chinese website!