超强大的任务调度库
简单描述
Advanced Python Scheduler(APScheduler) 是 python
下的一个允许你安排稍后执行任务,一次执行或定期执行任务的库。你可以随时添加新的任务或删除旧的任务。如果您将任务存储在数据库中,它们也将在调度器重新启动后继续运行并保持其状态。当调度程序重新启动时,它将运行它脱机时应该运行的所有任务
除此之外,APScheduler
还可以作为跨平台、特定于应用程序的替代品,替代特定于平台的调度器,例如cron守护程序或Windows任务调度器。但是,请注意,APScheduler
本身不是 守护程序或服务,也不附带任何命令行工具。它主要是在现有的应用程序中运行。也就是说,APScheduler
确实提供了一些实现方案,用于构建调度程序服务或运行专用调度程序进程。
APScheduler
还集成了几种常见的Python框架,如:
- asyncio (PEP 3156)
- gevent
- Tornado
- Twisted
安装
python3 -m pip install apscheduler
基本概念
APScheduler
有四种组件:
- 触发器 (triggers)
- 任务存储 (job stores)
- 执行器 (executors)
- 调度器 (schedulers)
触发器 (triggers)
触发器包含调度逻辑。 每个任务都有自己的触发器,该触发器确定下一步应在何时运行该任务。 除了其初始配置外,触发器完全是无状态的。
APScheduler
有三个内置的调度系统,您可以使用:
Cron-style
(计划任务)的调度(具有可选的开始/结束时间)- 基于间隔的执行(以偶数间隔运行任务,具有可选的开始/结束时间)
- 特定时间运行一次(在设定的日期/时间运行一次任务)
任务存储 (job stores)
任务商店是存储所有计划,默认情况下这些任务都是直接保存在内存中的,但同时也可以将这些任务保存在各种数据库中(通过序列化保存到数据库中,执行时再从数据库中取出反序列化),支持的存储任务的后端包括:
- Memory
- SQLAlchemy (any RDBMS supported by SQLAlchemy works)
- MongoDB
- Redis
- RethinkDB
- ZooKeeper
执行器 (executors)
执行者负责处理任务。 他们通常通过将任务中的指定可调用对象提交给线程或进程池来执行此操作。 任务完成后,执行程序通知调度程序,然后发出适当的事件。
调度器 (schedulers)
调度程序将其余部分绑定在一起。 通常,您的应用程序中仅运行一个调度程序。 应用程序开发人员通常不会直接处理任务存储库,执行程序或触发器。 而是,调度程序提供适当的接口来处理所有这些接口。 配置任务存储和执行程序是通过调度程序完成的,添加,修改和删除任务也是如此。
如何选择
上面介绍了 APScheduler
核心的四部分,现在最麻烦的是如何选择合适的这四部分
job stores
的选择
任务存储的选择基于是否需要任务持久化。如果总是在应用程序启动的时候重新创建任务,则可以使用默认设置 MemoryJobStore
, 但是如果您需要任务在调度程序重新启动或应用程序奔溃后继续存在,那么可以依赖于一些后台服务来存储这些任务。但是如果可以自由选择的话,建议使用基于 PostgreSQL
的 SQLAlchemyJobStore
,因为它具有强大的数据完整性保护功能。
schedulers
的选择
调度程序的选择主要取决于您的编程环境以及 APScheduler
的用途。 以下是选择计划程序的快速指南:
BlockingScheduler
:当调度程序是您的流程中唯一正在运行的东西时使用BackgroundScheduler
:在不使用以下任何框架且希望调度程序在应用程序内部的后台运行时使用AsyncIOScheduler
:如果您的应用程序使用asyncio
模块,则使用GeventScheduler
:如果您的应用程序使用gevent
,则使用TornadoScheduler
:在构建Tornado
应用程序时使用TwistedScheduler
:在构建Twisted
应用程序时使用QtScheduler
:在构建Qt
应用程序时使用
executors
的选择
如果是使用上面中 schedulers
提到的框架,通常情况下会自行选择合适的执行任务方式,否则默认的 ThreadPoolExecutor
就可以满足大部分用途。
但是如果任务中涉及到 CPU密集型 操作, 则应考虑改用 ProcessPoolExecutor
来利用多个CPU内核。 您甚至可以同时使用两者,将进程池执行程序添加为辅助执行程序。
triggers
的选择
APScheduler
带有三种内置的触发器类型:
data
: 在 特定时间仅执行一次 时使用interval
: 当需要 以固定时间间隔运行任务 时使用cron
: 在需要 在一天的特定时间定期运行任务 时使用
也可以将多个触发器组合成一个触发器,该触发器可以在所有参与触发器约定的时间内触发,也可以在任何触发器触发时触发。官方的描述有点像组成的触发器可以使它们的并集也可以使交集,具体细节,可以在使用的时候查询官方的 API
说明
配置调度程序
APScheduler
提供了许多不同的方式来配置调度程序。您可以使用配置字典,也可以将选项作为关键字参数传递。您还可以先实例化计划程序,然后添加任务并随后配置计划程序。这样,您可以在任何环境下获得最大的灵活性。
调度程序级别配置选项的完整列表可以在 BaseScheduler
该类的API参考上找到 。调度程序子类还可能具有其他选项,这些选项记录在其各自的API参考中。同样,可以在其API参考页上找到各个任务存储和执行程序的配置选项。
假设您要使用默认任务存储和默认执行程序在应用程序中运行 BackgroundScheduler
, 这将为您提供 BackgroundScheduler
,它的 MemoryJobStore
名为 default
,而 ThreadPoolExecutor
名为 default
,默认最大线程数为 10
。
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
但是,假设您想要更多。您要使用两个执行程序来拥有两个任务存储,还希望调整新任务的默认值,并设置不同的时区,
方法一:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})
方法三:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
启动调度程序
只需调用 start()
调度程序即可启动调度 程序。对于除 BlockingScheduler
以外的调度程序,此调用将立即返回,您可以继续应用程序的初始化过程,可能会向调度程序添加任务。
对于 BlockingScheduler
,调用 start()
前需要完成所有的初始化步骤,阻塞的 start()
之后是没办法更改其设置的。
添加任务
有两种将任务添加到调度程序 schedulers
的方法:
add_job()
scheduled_job()
第一种方法是最常用的方法,第二种方法主要是方便地声明在应用程序运行时不更改的任务
add_job()
方法返回一个 apscheduler.job.Job
实例,可用于以后修改或删除任务。
你可以在任何时候添加任务,如果在添加任务时调度程序尚未运行,则将暂定调度任务,并且仅在调度程序启动时才计算其首次运行时间
删除任务
当您从调度程序中删除任务时,该任务将从其关联的任务存储中删除,并且将不再执行。有两种方式可以实现此目的:
remove_job()
任务的ID
和任务存储别名remove()
add_job()
返回的Job
实例
后一种方法可能更加方便,但是它要求您将 Job
添加任务时收到的实例存储在某处
而通过 scheduled_job()
添加的计划, 第一种方法是唯一的方法
例如 :
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
使用 ID 来删除计划
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
暂停和恢复工作
您可以通过 Job
实例或调度程序本身轻松地暂停和恢复任务。暂停任务后,将清除其下一个运行时间,并且在恢复该任务之前,将不再为其计算其他运行时间。要暂停工作,请使用以下两种方法之一:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
恢复:
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
获取计划的任务列表
要获取预定任务的计算机可处理列表,可以使用该 get_jobs()
方法。它将返回 Job
实例列表 。如果您只对特定任务存储区中包含的任务感兴趣,则给任务存储区别名作为第二个参数。
为方便起见,您可以使用该 print_jobs()
方法将打印出格式化的任务列表,其触发器和下一次运行时间。
修改任务
您可以通过调用 apscheduler.job.Job.modify()
或 modify_job()
来修改任何任务属性。您可以修改除 Job
和 id
以外的任何属性。
job.modify(max_instances=6, name='Alternate name')
如果要重新计划任务-即更改其触发器,可以使用 apscheduler.job.Job.reschedule()
或 reschedule_job()
。这些方法为任务构造一个新的触发器,并根据新的触发器重新计算其下一次运行时间。
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
关闭调度程序
scheduler.shutdown()
默认情况下,调度程序关闭其任务存储和执行程序,并等待直到所有当前执行的任务完成。如果您不想等待,可以执行以下操作:
scheduler.shutdown(wait=False)
暂停/恢复任务处理
可以暂停计划任务的处理:
scheduler.pause()
这将导致调度程序在恢复处理之前不唤醒:
scheduler.resume()
也有可能在暂停状态下启动调度程序,即没有第一个唤醒调用:
scheduler.start(paused=True)
当您需要修剪不需要的任务之前,这很有用。
限制任务的并发执行实例数
默认情况下,每个任务仅允许一个实例同时运行。这意味着,如果该任务将要运行,但是上一轮尚未完成,则最新一轮将被视为断火。通过 max_instances
在添加任务时使用关键字参数,可以为调度程序允许并发运行的特定任务设置最大实例数。
错过了执行和合并
有时,计划程序在计划运行时可能无法执行计划的任务。最常见的情况是,在持久性任务存储中调度了任务,并且在应该执行该任务之后关闭了调度程序并重新启动了该调度程序。发生这种情况时,该任务被认为是 misfire(失职)
。然后,调度程序将对照任务的 misfire_grace_time
选项(可以在每个任务的基础上或在调度程序中全局设置)检查每个错过的执行时间,以查看是否仍应触发执行。这可能导致该任务连续执行多次。
如果这种行为对于您的特定用例而言是不希望有的,则可以使用合并将所有这些遗漏的执行汇总为一个。换句话说,如果为任务启用了合并,并且调度程序看到该任务的一个或多个排队执行,则它将仅触发一次。
注意 :
如果由于池中没有可用的线程或进程而延迟了任务的执行,则执行程序可能会由于执行得太晚(与其最初指定的运行时间相比)而跳过了它。如果您的应用程序中可能会发生这种情况,则可能要增加执行程序中的线程/进程数,或者将 misfire_grace_time
设置调整为更高的值。
调度事件
可以将事件侦听器附加到调度程序。调度程序事件在某些情况下会触发,并且可能在其中携带有关该特定事件的详细信息。通过给适当的 mask
参数 add_listener()
,或将不同的常数合在一起,可以仅侦听特定类型的事件 。使用一个参数(事件对象)调用可调用的侦听器。
有关 events
可用事件及其属性的详细信息,请参见该模块的文档。
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
日志
如果调度程序未按预期工作,则将记录 apscheduler
器的日志记录级别提高到该 DEBUG
级别将很有帮助。
如果首先尚未启用日志记录,则可以执行以下操作:
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
以上部分是对官方 https://apscheduler.readthedocs.io/en/stable/ 官方文档中用户指南部分的整理和翻译,官方介绍了很多核心功能,接下来我会使用几个例子,尽可能的覆盖上面提到一些功能,也方便大家使用
例子
from apscheduler.schedulers.blocking import BlockingScheduler
import apscheduler.events as aps_events
import datetime
import time
import logging
def my_listener(event):
if event.exception:
logging.info("{} : 执行失败".format(event.job_id))
else:
if event.job_id == "一次性任务":
scheduler.remove_job('定时任务')
logging.info("{}: 删除定时任务".format(event.job_id))
logging.info("{} : 执行成功".format(event.job_id))
def error_task():
logging.info("开始执行异常任务")
a = 1/0
def interval_task():
logging.info("循环任务 : {}".format(datetime.datetime.now()))
time.sleep(5)
def task(arg):
logging.info("{} : {}".format(datetime.datetime.now(), arg))
if __name__ == '__main__':
logging.basicConfig( level=logging.INFO,
format="[%(asctime)s][%(levelname)s] - %(message)s" )
scheduler = BlockingScheduler(logger=logging)
scheduler.add_job(func=interval_task, trigger='interval', seconds=2, max_instances=2, id="循环任务")
scheduler.add_job(func=task, args=('定时任务',), trigger='cron', second='*/5', id="定时任务")
scheduler.add_job(func=task, args=('一次性任务',), trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10), id="一次性任务")
scheduler.add_job(func=error_task, next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10), id="异常任务")
scheduler.add_listener(my_listener, aps_events.EVENT_JOB_EXECUTED | aps_events.EVENT_JOB_ERROR)
scheduler.start()
简单解释一下:scheduler
中添加了3种类型的4个任务,按照 add_job()
从上而下依次是:
- 2 秒执行 1 次,但是任务耗时 5 秒,并且允许的最大同时运行数为 2 的循环
interval
任务, 测试一下max_instances
的效果 - 时间的秒数是 5 的倍数就执行的定时
cron
任务 - 预计 10 后执行的一次性
date
任务 - 故意出错的异常任务, 测试测试监听函数是否好用,这里捕获的事件一个是任务执行失败和成功,当然它还有很多其他的,比如添加任务,删除任务,这些都可以监听
初次之外,在执行一次性任务成功之后,会删除定时任务,测试一下 remove_job()
的用法, 类似 pause_job()
和 resume_job()
我就不测试了
此外,对日志输出进行了格式化处理,并打印 INFO
级别以上的日志(默认:WARNING), 看更多 apscheduler
的日志
运行结果
[2020-12-19 18:21:11,101][INFO] - Adding job tentatively -- it will be properly scheduled when the scheduler starts
[2020-12-19 18:21:11,104][INFO] - Adding job tentatively -- it will be properly scheduled when the scheduler starts
[2020-12-19 18:21:11,115][INFO] - Adding job tentatively -- it will be properly scheduled when the scheduler starts
[2020-12-19 18:21:11,116][INFO] - Adding job tentatively -- it will be properly scheduled when the scheduler starts
[2020-12-19 18:21:11,116][INFO] - Added job "interval_task" to job store "default"
[2020-12-19 18:21:11,117][INFO] - Added job "task" to job store "default"
[2020-12-19 18:21:11,117][INFO] - Added job "task" to job store "default"
[2020-12-19 18:21:11,117][INFO] - Added job "error_task" to job store "default"
[2020-12-19 18:21:11,117][INFO] - Scheduler started
[2020-12-19 18:21:13,104][INFO] - Running job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:13 CST)" (scheduled at 2020-12-19 18:21:13.101294+08:00)
[2020-12-19 18:21:13,105][INFO] - 循环任务 : 2020-12-19 18:21:13.105206
[2020-12-19 18:21:15,018][INFO] - Running job "task (trigger: cron[second='*/5'], next run at: 2020-12-19 18:21:15 CST)" (scheduled at 2020-12-19 18:21:15+08:00)
[2020-12-19 18:21:15,019][INFO] - 2020-12-19 18:21:15.019276 : 定时任务
[2020-12-19 18:21:15,020][INFO] - Job "task (trigger: cron[second='*/5'], next run at: 2020-12-19 18:21:20 CST)" executed successfully
[2020-12-19 18:21:15,021][INFO] - 定时任务 : 执行成功
[2020-12-19 18:21:15,104][INFO] - Running job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:15 CST)" (scheduled at 2020-12-19 18:21:15.101294+08:00)
[2020-12-19 18:21:15,105][INFO] - 循环任务 : 2020-12-19 18:21:15.105198
[2020-12-19 18:21:17,114][WARNING] - Execution of job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:17 CST)" skipped: maximum number of running instances reached (2)
[2020-12-19 18:21:18,106][INFO] - Job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:19 CST)" executed successfully
[2020-12-19 18:21:18,107][INFO] - 循环任务 : 执行成功
[2020-12-19 18:21:19,105][INFO] - Running job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:19 CST)" (scheduled at 2020-12-19 18:21:19.101294+08:00)
[2020-12-19 18:21:19,106][INFO] - 循环任务 : 2020-12-19 18:21:19.106290
[2020-12-19 18:21:20,012][INFO] - Running job "task (trigger: cron[second='*/5'], next run at: 2020-12-19 18:21:20 CST)" (scheduled at 2020-12-19 18:21:20+08:00)
[2020-12-19 18:21:20,013][INFO] - 2020-12-19 18:21:20.013209 : 定时任务
[2020-12-19 18:21:20,014][INFO] - Job "task (trigger: cron[second='*/5'], next run at: 2020-12-19 18:21:25 CST)" executed successfully
[2020-12-19 18:21:20,015][INFO] - 定时任务 : 执行成功
[2020-12-19 18:21:20,107][INFO] - Job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:21 CST)" executed successfully
[2020-12-19 18:21:20,107][INFO] - 循环任务 : 执行成功
[2020-12-19 18:21:21,116][INFO] - Running job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:21 CST)" (scheduled at 2020-12-19 18:21:21.101294+08:00)
[2020-12-19 18:21:21,117][INFO] - Running job "task (trigger: date[2020-12-19 18:21:11 CST], next run at: 2020-12-19 18:21:21 CST)" (scheduled at 2020-12-19 18:21:21.103266+08:00)
[2020-12-19 18:21:21,117][INFO] - 循环任务 : 2020-12-19 18:21:21.117205
[2020-12-19 18:21:21,118][INFO] - Removed job 一次性任务
[2020-12-19 18:21:21,118][INFO] - 2020-12-19 18:21:21.118279 : 一次性任务
[2020-12-19 18:21:21,120][INFO] - Running job "error_task (trigger: date[2020-12-19 18:21:11 CST], next run at: 2020-12-19 18:21:21 CST)" (scheduled at 2020-12-19 18:21:21.115261+08:00)
[2020-12-19 18:21:21,120][INFO] - Job "task (trigger: date[2020-12-19 18:21:11 CST], next run at: 2020-12-19 18:21:21 CST)" executed successfully
[2020-12-19 18:21:21,120][INFO] - Removed job 异常任务
[2020-12-19 18:21:21,121][INFO] - 开始执行异常任务
[2020-12-19 18:21:21,125][INFO] - Removed job 定时任务
[2020-12-19 18:21:21,126][ERROR] - Job "error_task (trigger: date[2020-12-19 18:21:11 CST], next run at: 2020-12-19 18:21:21 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\ubuntu\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "c:/Users/ubuntu/Desktop/scheduler_demo.py", line 18, in error_task
a = 1/0
ZeroDivisionError: division by zero
[2020-12-19 18:21:21,127][INFO] - 一次性任务: 删除定时任务
[2020-12-19 18:21:21,129][INFO] - 异常任务 : 执行失败
[2020-12-19 18:21:21,129][INFO] - 一次性任务 : 执行成功
[2020-12-19 18:21:23,122][WARNING] - Execution of job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:23 CST)" skipped: maximum number of running instances reached (2)
[2020-12-19 18:21:24,108][INFO] - Job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:25 CST)" executed successfully
[2020-12-19 18:21:24,109][INFO] - 循环任务 : 执行成功
[2020-12-19 18:21:25,115][INFO] - Running job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:25 CST)" (scheduled at 2020-12-19 18:21:25.101294+08:00)
[2020-12-19 18:21:25,116][INFO] - 循环任务 : 2020-12-19 18:21:25.116302
[2020-12-19 18:21:26,120][INFO] - Job "interval_task (trigger: interval[0:00:02], next run at: 2020-12-19 18:21:27 CST)" executed successfully
[2020-12-19 18:21:26,121][INFO] - 循环任务 : 执行成功
这一部分的日志,可以清楚执行的逻辑:
2020-12-19 18:21:11,117
完成Job
任务的生成,添加到默认job store
, 以及调度系统的启动- 注意看其中的
[WARNING]
警告, 因为循环任务,5秒才能完成,2秒就需要执行一次,并且最多2个实例,所有第三个会被跳过 - 监听事件中监听到执行完成的
一次性任务
后,删除了定时任务
,定时任务
没有继续执行了,符合预期
同时监听事件也处理了任务执行失败的异常任务
,所以说,整体的测试还是和预期一样
总结
整个 APScheduler
看下来,对于任务的调度和控制真的很丰富,使用起来也相当简单!
现在有点好奇它实现的代码逻辑了,有时间会针对它的源码,在做一个简单梳理,争取年前吧,越拖越懒~