趁热打铁,学习一下 APScheduler
的 python
的源码,很好奇任务调度控制的实现。
分析源码主要还是针对 APScheduler
下的几个关键的模块
events
事件executors
执行器job
任务jobstores
任务存储triggers
触发器schedulers
调度程序
这一篇主要瞅瞅 triggers
触发器的混合模式 CombiningTrigger
BaseCombiningTrigger
BaseCombiningTrigger
是混合使用的 trigger
的基类, 它有 2 个子类 AndTrigger
和 OrTrigger
, 先看一下基类定义函数, 然后分别看一下子类的实现
初始化
传入一个 triggers
的包含多个 trigger
对象的列表, 以及一个提前或延迟执行的时间参数 jitter
def __init__(self, triggers, jitter=None):
self.triggers = triggers
self.jitter = jitter
序列化和反序列化
这一部分是基类 BaseCombiningTrigger
中直接定义的, 先介绍一下这一部分
def __getstate__(self):
return {
'version': 1,
'triggers': [(obj_to_ref(trigger.__class__), trigger.__getstate__())
for trigger in self.triggers],
'jitter': self.jitter
}
def __setstate__(self, state):
if state.get('version', 1) > 1:
raise ValueError(
'Got serialized data for version %s of %s, but only versions up to 1 can be '
'handled' % (state['version'], self.__class__.__name__))
self.jitter = state['jitter']
self.triggers = []
for clsref, state in state['triggers']:
cls = ref_to_obj(clsref)
trigger = cls.__new__(cls)
trigger.__setstate__(state)
self.triggers.append(trigger)
因为没有仔细研究过 cPickle
针对不同数据类型序列化过程中的差异,比如 jobstore
在序列化 job
时,job
提供的 __getstate__()
函数大致是这样的
return {
'version': 1,
'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
'executor': self.executor,
'args': args,
'kwargs': self.kwargs,
'name': self.name,
'misfire_grace_time': self.misfire_grace_time,
'coalesce': self.coalesce,
'max_instances': self.max_instances,
'next_run_time': self.next_run_time
}
其中
self.func_ref
通过obj_to_ref()
函数提前序列化成str
了self.trigger
是基于的BaseTrigger
的子类对象类型,所以他们的基类都提供了__getstate__()
和__setstate__()
方法self.executor
其实是一个executor
对象的别名str
,这个别名对应的对象被存储在schedulers
中args
是self.func_ref
函数的传入参数,是tuple
或者list
类型self.kwargs
是self.func_ref
函数的传入参数, 是dict
类型self.id
,self.name
是str
类型self.next_run_time
是datetime.datetime
对象类型self.misfire_grace_time
和self.max_instances
是int
类型self.coalesce
是bool
类型
但是像 DateTrigger
和 IntervalTrigger
以及 CronTrigger
提供的 __getstate__()
函数都很简单,可能是因为他们的成员变量里没有复杂的类型
这里可能说的有点复杂,简单说就是, cPickle
在做序列化过程中对 trigger
对象可以直接序列化,但是对于包含 trigger
的 list
这种嵌套结构,可能并做不到直接序列化,所以就有着这段代码
'triggers': [(obj_to_ref(trigger.__class__), trigger.__getstate__())
for trigger in self.triggers]
以 IntervalTrigger
为例:
- 通过
obj_to_ref()
函数将类的序列化得到这样的字符串apscheduler.triggers.interval:IntervalTrigger
- 然后通过
__getstate__()
获取到trigger
的实际状态数据
这篇文章就先不验证 cPickle
关于序列化和反序列化的细节,之后会抽时间详细,完整的测试一下
关于反序列化的 __setstate__()
函数就要稍微简单一点了
for clsref, state in state['triggers']:
cls = ref_to_obj(clsref)
trigger = cls.__new__(cls)
trigger.__setstate__(state)
self.triggers.append(trigger)
- 从
triggers
的list
中逐个取出经过obj_to_ref()
的clsref
以及通过__getstate__()
的state
- 通过
ref_to_obj()
函数反序列出的对象 cls.__new__(cls)
创建对象trigger.__setstate__(state)
恢复trigger
的状态- 最后
append
进self.triggers
中
关于 BaseCombiningTrigger
的基本介绍完了,接下来就是它的 2 个子类
OrTrigger
先看并集的 OrTrigger
, 它只需要实现一个函数 get_next_fire_time()
def get_next_fire_time(self, previous_fire_time, now):
fire_times = [trigger.get_next_fire_time(previous_fire_time, now)
for trigger in self.triggers]
fire_times = [fire_time for fire_time in fire_times if fire_time is not None]
if fire_times:
return self._apply_jitter(min(fire_times), self.jitter, now)
else:
return None
- 先让每一个
trigger
都调用get_next_fire_time()
函数获取下次执行时间,得到一个fire_times
的list
- 然后用了一个超简洁的一行代码,将
fire_times
中所有不为None
的筛选出来 - 最后判断
fire_times
- 不为空,取
fire_times
中最小的一个下次执行时间,先返回 - 为空,则返回
None
- 不为空,取
没毛病,很简单,就是那一行筛选的语句平时用的比较少,感觉还挺有意思的
OrTrigger
看上去逻辑很合理,但是有没有可能存在的什么问题吗?
OrTrigger
潜在 bug
假设存在多个 DateTrigger
, 先假设这里只有 2 个 DateTrigger
, 也就是说我让这个任务在 2 个固定时间执行, 当然这 2 个时间是不同的, 继续假设分别是 date1
和 date2
, 并且 date1 > date2 > now
根据 OrTrigger
的逻辑第一次执行的时候, 因为 previous_fire_time=None
, fire_times
等于得到 [date1, date2]
, 那么返回较小值, 也就是 date2
, 上层会把 date2
赋值给 job
的 next_run_time
那么下次调取 get_next_fire_time()
函数的时候, date2
其实就是 previous_fire_time
, 所以问题来了
def get_next_fire_time(self, previous_fire_time, now):
return self.run_date if previous_fire_time is None else None
根据前面 DateTrigger
的函数了解 previous_fire_time
不为 None
的时候会返回 None
, 这 2 个 DateTrigger
实际上都是返回 None
, 也就是说 date1
是不会触发的
所以在 我的思路 里有 3 个方案解决这个问题
方案一: 既然存在问题, 那就不使用 OrTrigger
, 添加任务的时候, 将这个任务, 使用不同 DateTrigger
多添加几遍
scheduler.add_job(job_function, DateTrigger(run_date=date1) )
scheduler.add_job(job_function, DateTrigger(run_date=date2) )
方案二: 需要改源码, 既然是一次性执行的任务, 其实可以不关心 previous_fire_time
, 只要你 now
大于 self.run_date
可以就返回 None
, 执行完之后, 再获取下次执行时间的时候其实 now
肯定大于 self.run_date
def get_next_fire_time(self, previous_fire_time, now):
return self.run_date if self.run_date > now else None
方案三: 也需要改源码,可能比较麻烦, 定义一个任务是否被执行的标志位, 这也是源码的设计大佬们开发 4.0
版本的方案
需要额外重申一遍, 这个系列的所有源码分析是基于
3.6.3
版本的apscheduler
OrTrigger
bug 结论
bug 只会出现在同时存在多个 DateTrigger
的时候, 所以使用的时候额外注意就好
AndTrigger
接着看交集的 AndTrigger
, 它也只需要实现一个函数 get_next_fire_time()
def get_next_fire_time(self, previous_fire_time, now):
while True:
fire_times = [trigger.get_next_fire_time(previous_fire_time, now)
for trigger in self.triggers]
if None in fire_times:
return None
elif min(fire_times) == max(fire_times):
return self._apply_jitter(fire_times[0], self.jitter, now)
else:
now = max(fire_times)
又是一段很有意思的代码
- 跟
OrTrigger
一样,先得到一个fire_times
- 因为是求并集,所以就一个目标,
fire_times
中的值都一样- 存在
None
,按照之前的逻辑,代表需要删除任务,所有直接返回None
- 最大值和最小值一样,则调用
self._apply_jitter()
后返回 - 如果都不满足,取最大值赋值给
now
在重新计算,直到出现满足条件的
- 存在
AndTrigger
看上去逻辑也挺合理,但是好像也有点问题
AndTrigger
潜在 bug
首先看一下官方提到的例子 https://apscheduler.readthedocs.io/en/v3.6.3/modules/triggers/combining.html#module-apscheduler.triggers.combining
它例子的代码只说明了 AndTrigger
如何初始化, 剩下的我来补齐
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.combining import AndTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
def job_function():
print("-----------------------")
scheduler = BlockingScheduler()
trigger = AndTrigger([IntervalTrigger(hours=2),
CronTrigger(day_of_week='sat,sun')])
scheduler.add_job(job_function, trigger)
scheduler.start()
先看一下官方对于这个 AndTrigger
的介绍
Run
job_function
every 2 hours, but only on Saturdays and Sundays
每2小时运行一次job_function
,但仅在周六和周日运行 (腾讯翻译)
解释的好像和我们想象中是一样的,但是实际却不是这样的 !
其中的 IntervalTrigger(hours=2)
确实是每隔 2 个小时执行一次, 但是没有定义开始时间 start_date
, 我在 APScheduler 源码阅读(四) triggers(一) 中介绍了 IntervalTrigger
的源码实现逻辑, 它每次的运行时间实际上都是 start_date
基础上加上若干个指定的时间间隔, 在这里就是加上若干个 2 小时, 而没有初始化start_date
就会默认使用 start_date = start_date or (datetime.now(self.timezone) + self.interval)
假设现在的时间是
2021-03-28 02:13:09.347022
, 那么IntervalTrigger(hours=2)
就是每次都是在这个时间基础上加若干个 2 小时
CronTrigger(day_of_week='sat,sun')
也简单分析一下, 首先是这个 CronTrigger
的初始化问题, 因为只设置了 day_of_week
, 那么会让 day_of_week
后面的字段全部按照默认值设置, 所以实际上 CornTrigger
定义应该是
year |
month |
day |
week |
day_of_week |
hour |
minute |
second |
---|---|---|---|---|---|---|---|
* |
* |
* |
* |
sat,sun |
0 |
0 |
0 |
这意味它会在每个周六周日的 00:00:00
执行一次任务
假设现在的时间是
2021-03-28 02:13:09.347022
, 那么CronTrigger(day_of_week='sat,sun')
接下来第一次执行的时间应该是2021-04-03 00:00:00
光 IntervalTrigger
秒后面携带的毫秒值, 很显然这个时间永远都不会有交集, 也就不存在 Run 'job_function' every 2 hours, but only on Saturdays and Sundays
这种解释
AndTrigger
进一步
但是假设我们强行让这 2 个有交集, 无非就是给 IntervalTrigger
指定一个开始时间, 所以改一下代码, 让 IntervalTrigger
从 00:00:00
开始执行
trigger = AndTrigger([IntervalTrigger(hours=2, start_date=datetime(2021 ,4, 1 ,0 ,0, 0)),
CronTrigger(day_of_week='sat,sun')])
这样写确实可以求出一个公共的 next_run_time
, 也就是 2021-04-03 00:00:00
, 虽然还没有介绍 schedulers
的源码, 但是有必要简单说一下实际任务的处理流程, 详细的流程图之类的会放到后面一篇中
scheduler.start()
启动之后, 会初始化各个组件executor
,jobstore
等scheduler.start()
前通过的add_job()
函数添加的任务信息也会真正初始化, 并且以previous_fire_time=None
为条件第一次计算任务下次执行时间, 并将这个值赋值给job
的next_run_time
变量中- 随后会启动线程, 执行
_process_jobs()
函数, 这个函数也是真正统筹任务调度的函数, 这里其实还有线程的睡眠和唤醒的调度的涉及, 先不展开 - 先假设到了
2021-04-03 00:00:00
这个时间线程被唤醒,_process_jobs()
函数会先获取一下当前时间, 那么now
肯定是大于2021-04-03 00:00:00
这个值,当然多出来的时间很小很小,只有几毫秒或者几十毫秒,_process_jobs()
函数会先从jobstore
中取出小于now
的任务, 刚好误差几毫秒, 完美把任务取出来, 但是从job
中获取执行时间的同时,会额外计算下一次执行时间, 也就是调用trigger
的get_next_fire_time(self, previous_fire_time, now)
函数, 但是此时传入函数的previous_fire_time
值就是next_run_time
, 也就是2021-04-03 00:00:00
但是不管是 IntervalTrigger
还是 CronTrigger
在 get_next_fire_time(self, previous_fire_time, now)
函数中 previous_fire_time
不为 None
的时候处理都不能满足实际情况, 还是分别看一下部分代码
# IntervalTrigger
def get_next_fire_time(self, previous_fire_time, now):
if previous_fire_time:
next_fire_time = previous_fire_time + self.interval
...
# CronTrigger
def get_next_fire_time(self, previous_fire_time, now):
if previous_fire_time:
start_date = min(now, previous_fire_time + timedelta(microseconds=1))
if start_date == previous_fire_time:
start_date += timedelta(microseconds=1)
else:
start_date = max(now, self.start_date) if self.start_date else now
next_date = datetime_ceil(start_date).astimezone(self.timezone)
...
它们的逻辑在 previous_fire_time
有值的时候, now
是没有任何用的
IntervalTrigger
: 在previous_fire_time
基础上加一个时间间隔self.interval
CronTrigger
: 在now
和previous_fire_time
中取较小值, 还有一个向上取整, 实际上就是previous_fire_time
向上取整 1 秒
所以不匹配之后 AndTrigger
的 now = max(fire_times)
设计也没有任何意义了
所以就算命中一次, 之后也不可能算出相同的 next_run_time
, 而且 while True
也会是一个死循环
AndTrigger
结论
感觉这个是在设计上就存在问题, 尽量避免使用 AndTrigger
, 但是很多情况下都可以仅使用 CronTrigger
来解决, 比如官方的例子, 每2小时运行一次 job_function
,但仅在周六和周日运行
CronTrigger(day_of_week='sat,sun', hour='2')
而关于解决方案, 我考虑了一段时间也没想到切实可行的方案, 但是在现在的设计思路下, 我觉得有几个方面一定要改
CornTrigger
是取整之后算下次执行时间,IntervalTrigger
和DateTrigger
返回的时间如果不取整, 比较时候永远不可能恰好相等, 当然比较的时候取整比较也行- 通过
get_next_fire_time(self, previous_fire_time, now)
获取到的下次执行时间, 不能单纯通过min(fire_times) == max(fire_times)
来比较, 我觉得可能需要将其中max(fire_times)
最大值, 传递给其他trigger
, 让他们计算一下它们在之后的时间中是否会触发, 更简单点说, 有点像在这一堆fire_times
中找出最小公倍数的感觉
但是找最小公倍数看着挺简单, 其实也没有想象中的简单, 我说一下我觉得问题
AndTrigger
是一个多 Trigger
复合的情况, 两两组合,甚至三个一起, 感觉找最小公倍数的逻辑就不太一样了
DateTrigger
是一次性, 如果它的时间不被其他trigger
触发, 等于这个任务是不可能执行的- 只存在
IntervalTrigger
, 就一定需要取整比较, 或者设置相同的start_date
, 但是需要提前算出最小公倍数, 利用previous_fire_time=None
结合now
算出第一个执行时间, 之后每次加最小公倍数
以下两种的混合目前基本没有什么思路
只存在 CornTrigger
或者 IntervalTrigger
和 CornTrigger
混用, 因为 CornTrigger
的存在, 可能会出现时间一直不能同时兼得的情况, 比如
CronTrigger(day_of_week='sat,sun')
CronTrigger(day_of_week='mon')
一直找不到 next_run_time
的死循环, 如何处理这个我也一点好的思路也没有
总结
不要在 OrTrigger
中同时设置多个 DateTrigger
的时候 !
不要使用 AndTrigger
!
当然上面的建议是基于 3.6.3
版本的, 4.0
版本大佬们正在开发, 期待他们的实现思路~