django-apscheduler和apscheduler的安装和示例
APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。django-apscheduler的github链接https://github.com/jarekwg/django-apscheduler,可以通过github中的示例学习django-apschedule人的使用:
django-apscheduler是在apscheduler的基础上针对django框架开发的一个插件:
This little wrapper around APScheduler enables storing persistent jobs in the database using Django’s ORM rather than requiring SQLAlchemy or some other bloatware.
Features in this project:
- Work on both python2.* and python3+
- .Manage jobs from Django admin interface
- Monitor your job execution status: duration, exception, traceback, input parameters.
支持python2以及python3+,能够管理job和监控job的执行状态,可以在数据库中持久化存储job。
django-apscheduler在Django框架中使用
1.1 需要安装的包
pip install django_apscheduler
pip install apscheduler
1.2 修改django的配置
在django的配置文件settings.py中的INSTALLED_APPS中添加子应用’django_apscheduler’。
INSTALLED_APPS = [
'django_apscheduler']
然后,执行迁移文件,更新数据库
# 执行迁移文件,更新数据库
python manage.py migrate
查看数据库,数据库中会新生成两个表分别为:django_apscheduler_djangojob和django_apscheduler_djangojobexecution
django_apscheduler_djangojob表结构和任务执行后数据记录
django_apscheduler_djangojobexecution表结构和任务执行后数据记录
1.3 编写定时任务
定时任务的示例如下,在django项目中的任意子应用的视图view.py文件中添加定时任务,项目启动之后,定时任务会自动执行。
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 时间间隔3秒钟打印一次当前的时间
@register_job(scheduler, "interval", seconds=3, id='test_job')
def test_job():
format_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print(format_time)
# per-execution monitoring, call register_events on your scheduler
register_events(scheduler)
scheduler.start()
print("Scheduler started!")
apscheduler的文档地址:https://apscheduler.readthedocs.io/en/latest/userguide.html,github地址:https://github.com/agronholm/apscheduler,参数的设置,更多内容的学习,可以参考文档的内容部分,参数的设置可以参考博文:http://www.chenxm.cc/article/829.html
apsheduler的简单用例:
from apscheduler.schedulers.blocking import BlockingScheduler
import time
# 定时的任务,打印当前的时间
def test():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
scheduler = BlockingScheduler()
# 时间间隔5秒
scheduler.add_job(test, trigger='interval', seconds=5, id='test')
scheduler.start()
apsheduler内置的触发器trigger有三种:
date 日期:触发任务运行的具体日期(只运行一次)
interval 间隔:触发任务运行的时间间隔
cron 周期:触发任务运行的周期(周期性运行)
django-apscheduler和apscheduler的参数设置基本类似,可以互相参考。
更多推荐
所有评论(0)