基于Fastapi實現(xiàn)定時任務(wù)
簡介
定時任務(wù)是一個通用場景常見的功能,之前我使用django的時候,更習(xí)慣使用celery中的定時任務(wù),現(xiàn)在花時間看了看
apscheduler感覺不錯,就寫了demo,并集成到項目代碼中了
任務(wù)調(diào)度主要就是以下幾個功能
添加/刪除 任務(wù)調(diào)度
暫停/恢復(fù) 任務(wù)調(diào)度(這條我未實現(xiàn))
查看定時任務(wù)狀態(tài)
實現(xiàn)

添加定時任務(wù)
其中添加定時任務(wù)方式,有以下三種方式
date: 固定的時間執(zhí)行一次時 用這種
interval: 想要在固定的間隔時間循環(huán)執(zhí)行時用這種
cron: 這種就是最為靈活的?
crontab?表達(dá)式定時任務(wù)了
在FastAPI異步框架中,選擇?AsyncIOScheduler調(diào)度程序
默認(rèn)使用sqlite持久化定時任務(wù),不至于重啟就失效
from apscheduler.schedulers.asyncio import AsyncIOSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.triggers.cron import CronTriggerSchedule = AsyncIOScheduler(jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})Schedule.start()
不完善的地方
1 由于時間倉促,比如參數(shù)驗證的部分,參數(shù)數(shù)據(jù)校驗,還可以繼續(xù)完善。
2 添加調(diào)度任務(wù)參數(shù)封裝,我把三種添加任務(wù)的三種方式,拆成了三個函數(shù),其中很多參數(shù)沒有用到。
3 其中文檔中還有說明最大woker數(shù)量限制之類的,然后按照我使用其他定時任務(wù)的常識,應(yīng)該還會函數(shù)最長執(zhí)行時間限制(比如執(zhí)行的功能函數(shù)特別耗時)
4 ...等等
這些就需要自己查看文檔和issues搜索了
其他方案
定時任務(wù)有很多種方案,比如可以使用
arq?https://github.com/samuelcolvin/arq?目前看來 star數(shù)量不多fastapi-utils?https://github.com/dmontagu/fastapi-utilscelery?https://github.com/celery/celery Celery就是Python中最為重量級,且常見的任務(wù)隊列, 你也可以只用其中的定時任務(wù)功能。
說到
arq, 想起我之前,使用過rq?并且學(xué)習(xí)的時候稍微翻譯了一下文檔 rq v1.0 https://codercharm.github.io/Python-rq-doc-cn/#/?一晃過去一年了。
代碼地址
單文件例子:?https://github.com/CoderCharm/fastapi-mysql-generator/blob/master/examples/demo_scheduler/main.py
項目中使用:?https://github.com/CoderCharm/fastapi-mysql-generator/blob/master/{{cookiecutter.project_name}}/app/api/__init__.py#L230
參考地址
apscheduler官方文檔
FastAPI issues
原文來自
https://www.cnblogs.com/CharmCode/p/14191009.html
10T 技術(shù)資源大放送!包括但不限于:Linux、虛擬化、容器、云計算、網(wǎng)絡(luò)、Python、Go 等。在?開源Linux?公眾號內(nèi)回復(fù)?10T,即可免費(fèi)獲取!
有收獲,點(diǎn)個在看?



