• 技术文章 >后端开发 >Python教程

    详细解析Python实现定时任务之apscheduler

    WBOYWBOY2022-10-10 16:29:33转载943
    本篇文章给大家带来了关于Python的相关知识,其中主要介绍了关于实现定时任务的相关问题,可以使用第三方包来管理定时任务,相对来说apscheduler使用起来更简单,下面一起来看一下使用的方法,希望对大家有帮助。

    php入门到就业线上直播课:进入学习

    【相关推荐:Python3视频教程

    初识apscheduler

    来个简单的例子看看apscheduler是如何使用的。

    #encoding:utf-8
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    def sch_test():
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('时间:{}, 测试apscheduler'.format(now))
    task = BlockingScheduler()
    task.add_job(func=sch_test, trigger='cron', second='*/10')
    task.start()

    上述例子很简单,我们首先要定义一个apscheduler的对象,然后add_job添加任务,最后start开启任务就行了。

    例子是每隔10秒运行一次sch_test任务,运行结果如下:

    时间:2022-10-08 15:16:30, 测试apscheduler
    时间:2022-10-08 15:16:40, 测试apscheduler
    时间:2022-10-08 15:16:50, 测试apscheduler
    时间:2022-10-08 15:17:00, 测试apscheduler

    如果我们要在执行任务函数时携带参数,只要在add_job函数中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。

    apscheduler有哪些模块

    上面例子中我们初步了解到如何使用apschedulerl了,接下来需要知道apscheduler的设计框架。apscheduler有四个主要模块,分别是:触发器triggers、任务存储器job_stores、执行器executors、调度器schedulers。

    1. 触发器triggers:

    触发器指的是任务指定的触发方式,例子中我们用的是“cron”方式。我们可以选择cron、date、interval中的一个。

    cron表示的是定时任务,类似linux crontab,在指定的时间触发。

    可用参数如下:

    01.png

    除此之外,我们还可用表达式类型去设置cron。比如常用的有:

    02.png

    使用方法示例,在每天7点20分执行一次:

    task.add_job(func=sch_test, args=('定时任务',), trigger='cron',

    hour='7', minute='20')

    date表示具体到某个时间的一次性任务;

    使用方法示例:

    # 使用run_date指定运行时间
    task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
    # 或者用next_run_time
    task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

    interval表示的是循环任务,指定一个间隔时间,每过间隔时间执行一次。

    interval可设置如下的参数:

    03.png

    使用方法示例,每隔3秒执行一次sch_test任务:

    task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。

    来个例子把3种触发器都使用一遍:

    # encoding:utf-8
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    def sch_test(job_type):
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('时间:{}, {}测试apscheduler'.format(now, job_type))
    task = BlockingScheduler()
    task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
    task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
    task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
    task.start()

    打印部分结果:

    时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
    时间:2022-10-08 15:45:49, 循环任务测试apscheduler
    时间:2022-10-08 15:45:50, 定时任务测试apscheduler
    时间:2022-10-08 15:45:52, 循环任务测试apscheduler
    时间:2022-10-08 15:45:55, 定时任务测试apscheduler
    时间:2022-10-08 15:45:55, 循环任务测试apscheduler
    时间:2022-10-08 15:45:58, 循环任务测试apscheduler

    通过代码示例和结果展示,我们可清晰的知道不同触发器的使用区别。

    2. 任务存储器job_stores

    顾名思义,任务存储器是存储任务的地方,默认都是存储在内存中。我们也可自定义存储方式,比如将任务存到mysql中。这里有以下几种选择:

    04.png

    通常默认存储在内存即可,但若程序故障重启的话,会重新拉取任务运行了,如果你对任务的执行要求高,那么可以选择其他的存储器。

    使用SQLAlchemyJobStore存储器示例:

    from apscheduler.schedulers.blocking import BlockingScheduler
    def sch_test(job_type):
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('时间:{}, {}测试apscheduler'.format(now, job_type))
    sched = BlockingScheduler()
    # 使用mysql存储任务
    sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
    sched.add_jobstore('sqlalchemy',url=sql_url)
    # 添加任务
    sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
    sched.start()

    3. 执行器executors

    执行器的功能就是将任务放到线程池或进程池中运行。有以下几种选择:

    05.png

    默认是ThreadPoolExecutor, 常用的也就是第线程和进程池执行器。如果应用是CPU密集型操作,可用ProcessPoolExecutor来执行。

    4. 调度器schedulers

    调度器属于apscheduler的核心,它扮演着统筹整个apscheduler系统的角色,存储器、执行器、触发器在它的调度下正常运行。调度器有以下几个:

    06.png

    不是特定场景下,我们最常用的是BlockingScheduler调度器。

    异常监听

    定时任务在运行时,若出现错误,需要设置监听机制,我们通常结合logging模块记录错误信息。

    使用示例:

    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
    import logging
    # logging日志配置打印格式及保存位置
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        filename='sche.log',
                        filemode='a')
    def log_listen(event):
    if event.exception :
    print ( '任务出错,报错信息:{}'.format(event.exception))
    else:
    print ( '任务正常运行...' )
    def sch_test(job_type):
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('时间:{}, {}测试apscheduler'.format(now, job_type))
        print(1/0)
    sched = BlockingScheduler()
    # 使用mysql存储任务
    sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
    sched.add_jobstore('sqlalchemy',url=sql_url)
    # 添加任务
    sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
    # 配置任务执行完成及错误时的监听
    sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    # 配置日志监听
    sched._logger = logging
    sched.start()

    apscheduler的封装使用

    上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
    import logging
    import logging.handlers
    import os
    import datetime
    class LoggerUtils():
        def init_logger(self, logger_name):
            # 日志格式
            formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
            log_obj = logging.getLogger(logger_name)
            log_obj.setLevel(logging.INFO)
            # 设置log存储位置
            path = '/data/logs/'
            filename = '{}{}.log'.format(path, logger_name)
            if not os.path.exists(path):
                os.makedirs(path)
            # 设置日志按照时间分割
            timeHandler = logging.handlers.TimedRotatingFileHandler(
               filename,
               when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
               interval=1, # 多少天切割一次
               backupCount=10  # 保留几天
            )
            timeHandler.setLevel(logging.INFO)
            timeHandler.setFormatter(formatter)
            log_obj.addHandler(timeHandler)
            return log_obj
    class Scheduler(LoggerUtils):
        def __init__(self):
            # 执行器设置
            executors = {
                'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
                'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
            }
            self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
            # 存储器设置
            # 这里使用sqlalchemy存储器,将任务存储在mysql
            sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
            self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
            def log_listen(event):
                if event.exception:
                    # 日志记录
                    self.scheduler._logger.error(event.traceback)
        
            # 配置任务执行完成及错误时的监听
            self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
            # 配置日志监听
            self.scheduler._logger = self.init_logger('sche_test')
        def add_job(self, *args, **kwargs):
            """添加任务"""
            self.scheduler.add_job(*args, **kwargs)
        def start(self):
            """开启任务"""
            self.scheduler.start()
    # 测试任务
    def sch_test(job_type):
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('时间:{}, {}测试apscheduler'.format(now, job_type))
        print(1/0)
    # 添加任务,开启任务
    sched = Scheduler()
    # 添加任务
    sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
    # 开启任务
    sched.start()

    【相关推荐:Python3视频教程

    以上就是详细解析Python实现定时任务之apscheduler的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:掘金,如有侵犯,请联系admin@php.cn删除

    前端(VUE)零基础到就业课程:点击学习

    清晰的学习路线+老师随时辅导答疑

    自己动手写 PHP MVC 框架:点击学习

    快速了解MVC架构、了解框架底层运行原理

    专题推荐:python
    上一篇:什么是Python垃圾回收机制中的引用计数 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • ❤️‍🔥共22门课程,总价3725元,会员免费学• ❤️‍🔥接口自动化测试不想写代码?• 实例详解python类对象的析构释放• PHP程序中运行Python脚本的方法• python分析inkscape路径数据方案简单介绍• Python怎么多线程并发下载图片• Mac环境下怎么设置默认Python 版本?
    1/1

    PHP中文网