日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

5分鐘快速掌握Python定時任務框架的實現

瀏覽:171日期:2022-06-29 10:20:20
APScheduler 簡介

在實際開發中我們經常會碰上一些重復性或周期性的任務,比如像每天定時爬取某個網站的數據、一定周期定時運行代碼訓練模型等,類似這類的任務通常需要我們手動來進行設定或調度,以便其能夠在我們設定好的時間內運行。

在 Windows 上我們可以通過計劃任務來手動實現,而在 Linux 系統上往往我們會用到更多關于 crontab 的相關操作。但手動管理并不是一個很好的選擇,如果我們需要有十幾個不同的定時任務需要管理,那么每次通過人工來進行干預未免有些笨拙,那這時候就真的是「人工智能」了。

所以將這些定時任務的調度代碼化才是能夠讓我們很好地從這種手動管理的純人力操作中解脫出來。

在 Python 生態中對于定時任務的一些操作主要有那么幾個:

schedule:第三方模塊,該模塊適合比較輕量級的一些調度任務,但卻不適用于復雜時間的調度 APScheduler:第三方定時任務框架,是對 Java 第三方定時任務框架 Quartz 的模仿與移植,能提供比 schedule 更復雜的應用場景,并且各種組件都是模塊化,易于使用與二次開發。 Celery Beat:屬于 celery 這分布式任務隊列第三方庫下的一個定時任務組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費一定的時間在環境搭建上,但在高版本中已經不支持 Windows。

所以為了滿足能夠相對復雜的時間條件,又不需要在前期的環境搭建上花費很多時間的前提下,選擇 APScheduler 來對我們的調度任務或定時任務進行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關 APScheduler 的使用。

APScheduler 概念與組件

雖然說官方文檔上的內容不是很多,而且所列舉的 API 不是很多,但這側面也反映了這一框架的簡單易用。所以在使用 APScheduler 之前,我們需要對這個框架的一些概念簡單了解,主要有那么以下幾個:

觸發器(trigger) 任務持久化(job stores) 執行器(executor) 調度器(scheduler)觸發器(trigger)

所謂的觸發器就是用以觸發定時任務的組件,在 APScheduler 中主要是指時間觸發器,并且主要有三類時間觸發器可供使用:

date:日期觸發器。日期觸發器主要是在某一日期時間點上運行任務時調用,是 APScheduler 里面最簡單的一種觸發器。所以通常也適用于一次性的任務或作業調度。 interval:間隔觸發器。間隔觸發器是在日期觸發器基礎上擴展了對時間部分,比如時、分、秒、天、周這幾個部分的設定。是我們用以對重復性任務進行設定或調度的一個常用調度器。設定了時間部分之后,從起始日期開始(默認是當前)會按照設定的時間去執行任務。 cron:cron 表達式觸發器。cron 表達式觸發器就等價于我們 Linux 上的 crontab,它主要用于更復雜的日期時間進行設定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表達式,最多只支持到 5 位。任務持久化(job stores)

任務持久化主要是用于將設定好的調度任務進行存儲,即便是程序因為意外情況,如斷電、電腦或服務器重啟時,只要重新運行程序時,APScheduler 就會根據對存儲好的調度任務結果進行判斷,如果出現已經過期但未執行的情況會進行相應的操作。

APScheduler 為我們提供了多種持久化任務的途徑,默認是使用 memory 也就是內存的形式,但內存并不是持久化最好的方式。最好的方式則是通過像數據庫這樣的載體來將我們的定時任務寫入到磁盤當中,只要磁盤沒有損壞就能將數據給恢復。

APScheduler 支持的且常用的數據庫主要有:

sqlalchemy 形式的數據庫,這里就主要是指各種傳統的關系型數據庫,如 MySQL、PostgreSQL、SQLite 等。 mongodb 非結構化的 Mongodb 數據庫,該類型數據庫經常用于對非結構化或版結構化數據的存儲或操作,如 JSON。 redis 內存數據庫,通常用作數據緩存來使用,當然通過一些主從復制等方式也能實現當中數據的持久化或保存。

通常我們可以在創建 Scheduler 實例時創建,或是單獨為任務指定。配置的方式相對簡單,我們只需要指定對應的數據庫鏈接即可。

執行器(executor)

執行器顧名思義就是執行我們任務的對象,在計算機內通常要么是 CPU 調度任務,要么是單獨維護一個線程來運行任務。所以 APScheduler 里的執行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 這樣的線程池和進程池兩種。

當然如果是和協程或異步相關的任務調度,還可以使用對應的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三種執行器。

調度器(scheduler)

調度器的選擇主要取決于你當前的程序環境以及 APScheduler 的用途。根據用途的不同,APScheduler 又提供了以下幾種調度器:

BlockingScheduler:阻塞調度器,當程序中沒有任何存在主進程之中運行東西時,就則使用該調度器。 BackgroundScheduler:后臺調度器,在不使用后面任何的調度器且希望在應用程序內部運行時的后臺啟動時才進行使用,如當前你已經開啟了一個 Django 或 Flask 服務。 AsyncIOScheduler:AsyncIO 調度器,如果代碼是通過 asyncio 模塊進行異步操作,使用該調度器。 GeventScheduler:Gevent 調度器,如果代碼是通過 gevent 模塊進行協程操作,使用該調度器 TornadoScheduler:Tornado 調度器,在 Tornado 框架中使用 TwistedScheduler:Twisted 調度器,在基于 Twisted 的框架或應用程序中使用 QtScheduler:Qt 調度器,在構建 Qt 應用中進行使用。

通常情況下如果不是和 Web 項目或應用集成共存,那么往往都首選 BlockingScheduler 調度器來進行操作,它會在當前進程中啟動相應的線程來進行任務調度與處理;反之,如果是和 Web 項目或應用共存,那么需要選擇 BackgroundScheduler 調度器,因為它不會干擾當前應用的線程或進程狀況。

基于對以上的概念和組件認識,我們就能基本上摸清 APScheduler 的運行流程:

設定調度器(scheduler)用以對任務的調度與安排進行全局統籌 對相應的函數或方法上設定相應的觸發器(trigger),并添加到調度器中 如有任務持久化(job stores)需要則需要設定對應的持久化層,否則默認使用內存存儲任務 當觸發器被觸發時,就將任務交由執行器(executor)進行執行APScheduler 快速上手

雖然 APScheduler 里面的概念和組件看起來有點多,但在使用上并不算很復雜,我們可以通過本節的示例就能夠很快使用。

選擇對應的 scheduler

在使用之前我們需要先實例化一個 scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補全的提示來獲取相應的 scheduler 對象。

這里我直接選取了最基礎的 BlockingScheduler:

# main.py from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()

配置 scheduler

對于 scheduler 的一些配置我們可以直接在實例化對象時就進行配置,當然也可以在創建實例化對象之后再進行配置。

實例化時進行參數配置:

# main.pyfrom datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.blocking import BlockingScheduler # 任務持久化 使用 SQLitejobstores = { ’default’: SQLAlchemyJobStore(url = ’sqlite:///jobs.db’)}# 執行器配置executors = { ’default’: ThreadPoolExecutor(20),}# 關于 Job 的相關配置,見官方文檔 APIjob_defaults = { ’coalesce’: False, ’next_run_time’: datetime.now()}scheduler = BlockingScheduler( jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = ’Asia/Shanghai’)

或是通過 scheduler.configure 方法進行同樣的操作:

scheduler = BlockingScheduler()scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=’Asia/Shanghai’)添加并執行你的任務

創建 scheduler 對象之后,我們需要調用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執行的函數進行注冊。前者是以傳參的形式指定對應的函數名,而后者則是以裝飾器的形式直接對我們要執行的函數進行修飾。

比如我現在有一個輸出此時此刻時間的函數 now():

from datetime import datetime def now(trigger): print(f'trigger:{trigger} -> {datetime.now()}')

然后我打算每 5 秒的時候運行一次,那我們使用 add_job() 可以這樣寫:

if __name__ == ’__main__’: scheduler.add_job(now, trigger = 'interval', args = ('interval',), seconds = 5) scheduler.start()

在調用 start() 方法之后調度器就會開始執行,并在控制臺上看到對應的結果了:

trigger:interval -> 2021-01-16 21:19:43.356674trigger:interval -> 2021-01-16 21:19:46.679849trigger:interval -> 2021-01-16 21:19:48.356595

當然使用 @scheduled_job 的方式來裝飾我們的任務或許會更加自由一些,于是上面的例子就可以寫成這樣:

@scheduler.scheduled_job(trigger = 'interval', args = ('interval',), seconds = 5)def now(trigger): print(f'trigger:{trigger} -> {datetime.now()}') if __name__ == ’__main__’: scheduler.start()

運行之后就會在控制臺看到同樣的結果了。

不過需要注意的是,添加任務一定要在 start() 方法執行前調用,否則會找不到任務或是拋出異常。

將 APScheduler 集成到 Web 項目中

如果你是正在做有關的 Web 項目且存在一些定時任務,那么得益于 APScheduler 由于多樣的調度器,我們能夠將其和我們的項目結合到一起。

如果你正在使用 Flask,那么 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關的文檔,但只要你了解了前面我所介紹的有關于 APScheduler 的概念和組件,你就能很輕易地看懂這個第三方庫倉庫里的示例代碼。

如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些對任務或作業的增刪改查操作,我們可以自己編寫一套合適的 API。

這里我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項目結構如下:

temp-scheduler├── config.py # 配置項├── main.py # API 文件└── scheduler.py # APScheduler 相關設置安裝依賴

這里我們需要的依賴不多,只需要簡單幾個即可:

pip install fastapi apscheduler sqlalchemy uvicorn配置項

如果項目中模塊過多,那么使用一個文件或模塊來進行統一管理是最好的選擇。這里的 config.py 我們主要像 Flask 的配置那樣簡單設定:

from apscheduler.executors.pool import ThreadPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.blocking import BlockingScheduler class SchedulerConfig: JOBSTORES = {'default': SQLAlchemyJobStore(url='sqlite:///job.db')} EXECUTORS = {'default': ThreadPoolExecutor(20)} JOB_DEFAULTS = {'coalesce': False} @classmethod def to_dict(cls): return { 'jobstores': cls.JOBSTORES, 'executors': cls.EXECUTORS, 'job_defaults': cls.JOB_DEFAULTS, }

在 SchedulerConfig 配置項中我們可以自己實現一個 to_dict() 類方法,以便我們后續傳參時通過解包的方式直接傳入配置參數即可。

Scheduler 相關設置

scheduler.py 模塊的設定也比較簡單,即設定對應的 scheduler 調度器即可。由于是演示 demo 我還將要定期執行的任務也放在了這個模塊當中:

import loggingfrom datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from config import SchedulerConfig scheduler = BackgroundScheduler()logger = logging.getLogger(__name__) def init_scheduler() -> None: # config scheduler scheduler.configure(**SchedulerConfig.to_dict()) logger.info('scheduler is running...') # schedule test scheduler.add_job( func=mytask, trigger='date', args=('APScheduler Initialize.',), next_run_time=datetime.now(), ) scheduler.start() def mytask(message: str) -> None: print(f'[{datetime.now()}] message: {message}')

在這一部分中:

init_scheduler() 方法主要用于在 API 服務啟動時被調用,然后對 scheduler 對象的配置以及測試 mytask() 則是我們要定期執行的任務,后續我們可以通過 APScheduler 提供的方法來自行添加任務API 設置

在 main.py 模塊就主要存放著我們由 FastAPI 所構建的相關 API。如果在后續開發時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達到路由的分發與管理,類似于 Flask 的藍圖模式。

import loggingimport uuidfrom datetime import datetimefrom typing import Any, Dict, Optional, Sequence, Union from fastapi import FastAPIfrom pydantic import BaseModel from scheduler import init_scheduler, mytask, scheduler logger = logging.getLogger(__name__) app = FastAPI(title='APScheduler API')app.add_event_handler('startup', init_scheduler) class Job(BaseModel): id: Union[int, str, uuid.UUID] name: Optional[str] = None func: Optional[str] = None args: Optional[Sequence[Optional[str]]] = None kwargs: Optional[Dict[str, Any]] = None executor: Optional[str] = None misfire_grace_time: Optional[str] = None coalesce: Optional[bool] = None max_instances: Optional[int] = None next_run_time: Optional[Union[str, datetime]] = None @app.post('/add')def add_job( message: str, trigger: str, trigger_args: Optional[dict], id: Union[str, int, uuid.UUID],): try: scheduler.add_job( func=mytask, trigger=trigger, kwargs={'message': message}, id=id, **trigger_args, ) except Exception as e: logger.exception(e.args) return {'status_code': 0, 'message': '添加失敗'} return {'status_code': 1, 'message': '添加成功'} @app.delete('/delete/{id}')def delete_job(id: Union[str, int, uuid.UUID]): '''delete exist job by id''' try: scheduler.remove_job(job_id=id) except Exception: return dict( message='刪除失敗', status_code=0, ) return dict( message='刪除成功', status_code=1, ) @app.put('/reschedule/{id}')def reschedule_job( id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]): try: scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args) except Exception as e: logger.exception(e.args) return dict( message='修改失敗', status_code=0, ) return dict( message='修改成功', status_code=1, ) @app.get('/job')def get_all_jobs(): jobs = None try: job_list = scheduler.get_jobs() if job_list: jobs = [Job(**task.__getstate__()) for task in job_list] except Exception as e: logger.exception(e.args) return dict( message='查詢失敗', status_code=0, jobs=jobs, ) return dict( message='查詢成功', status_code=1, jobs=jobs, ) @app.get('/job/{id}')def get_job_by_id(id: Union[int, str, uuid.UUID]): jobs = [] try: job = scheduler.get_job(job_id=id) if job: jobs = [Job(**job.__getstate__())] except Exception as e: logger.exception(e.args) return dict( message='查詢失敗', status_code=0, jobs=jobs, ) return dict( message='查詢成功', status_code=1, jobs=jobs, )

以上代碼看起來很多,其實核心的就那么幾點:

FastAPI 對象 app 的初始化。這里用到的 add_event_handler() 方法就有點像 Flask 中的 before_first_request,會在 Web 服務請求伊始進行操作,理解為初始化相關的操作即可。

API 接口路由。路由通過 app 對象下的對應 HTTP 方法來實現,如 GET、POST、PUT 等。這里的裝飾器用法其實也和 Flask 很類似,就不多贅述。

scheduler 對象的增刪改查。從 scheduler.py 模塊中引入我們創建好的 scheduler 對象之后就可以直接用來做增刪改查的操作:

增:使用 add_job() 方法,其主要的參數是要運行的函數(或方法)、觸發器以及觸發器參數等 刪:使用 delete_job() 方法,我們需要傳入一個對應任務的 id 參數,用以能夠查找到對應的任務 改:使用 reschedule_job() 方法,這里也需要一個對應任務的 id 參數,以及需要重新修改的觸發器及其參數 查:使用 get_jobs() 和 get_job() 兩個方法,前者是直接獲取到當前調度的所有任務,返回的是一個包含了 APScheduler.job.Job 對象的列表,而后者是通過 id 參數來查找對應的任務對象;這里我通過底層源碼使用 __getstate__() 來獲取到任務的相關信息,這些信息我們通過事先設定好的 Job 對象來對其進行序列化,最后將信息從接口中返回。運行

完成以上的所有操作之后,我們就可以打開控制臺,進入到該目錄下并激活我們的虛擬環境,之后運行:

uvicorn main:app

之后我們就能在 FastAPI 默認的地址 http://127.0.0.1:8000/docs 中看到關于全部接口的 Swagger 文檔頁面了:

5分鐘快速掌握Python定時任務框架的實現

fastapi 集成的 swagger 頁面

之后我們可以直接在文檔里面或使用 Postman 來自己進行接口測試即可。

結尾

本文介紹了有關于 APScheduler 框架的概念及其用法,并進行了簡單的實踐。

得益于 APScheduler 的模塊化設計才可以讓我們更方便地去理解、使用它,并將其運用到我們實際的開發過程中。

從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經在開始重構 4.0 版本,當中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。

但如果現階段你正打算使用或已經使用 APScheduler 用于實際生產中,那么希望本文能對會你有所幫助。

到此這篇關于5分鐘快速掌握Python定時任務框架的實現的文章就介紹到這了,更多相關Python 定時任務內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Python 編程
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
久久国产乱子精品免费女| 久久爱www成人| 国产亚洲久久| 亚洲国产日韩欧美在线| 伊伊综合在线| 亚洲精品四区| 国产视频一区在线观看一区免费| 国产精品成人a在线观看| 国产精品久久| 日韩国产高清在线| 亚洲免费一区三区| 麻豆成人在线| 9国产精品视频| 久久久久国产| 九九久久国产| 日韩1区在线| 欧美精品1区| 麻豆传媒一区二区三区| 美国欧美日韩国产在线播放| 国产午夜精品一区二区三区欧美| 激情综合激情| 伊人久久高清| 亚洲91久久| 久久久久久久久99精品大| 日韩国产网站| 日韩精品水蜜桃| 日韩伦理在线一区| 久久久久99| 日韩1区2区| 亚洲精品美女91| 亚洲免费一区三区| 午夜精品福利影院| 日韩一区二区三区精品| 日韩精品视频一区二区三区| 一区二区三区国产在线| 亚洲精品日韩久久| 欧美久久精品| 精品亚洲a∨一区二区三区18| 精品久久国产一区| 国产高潮在线| 亚洲成人精品| 91久久中文| 蜜臀久久99精品久久久画质超高清| 综合激情视频| 国产精品腿扒开做爽爽爽挤奶网站| 国产视频一区免费看| 日日夜夜免费精品| 国产精品网址| 麻豆成人综合网| 国产一区二区三区黄网站| 国产一区二区精品久| 国产 日韩 欧美一区| 欧美粗暴jizz性欧美20| 一区二区国产在线观看| 国产视频一区二区在线播放| 91精品啪在线观看国产爱臀| 国产传媒av在线| 久久激情婷婷| 美女久久网站| 免费不卡在线视频| 欧美日韩夜夜| 欧美日韩视频免费观看| 欧美日韩国产亚洲一区| 亚洲国产综合在线看不卡| 免费久久精品视频| 国产精品宾馆| 91九色精品国产一区二区| 亚洲精品日韩久久| 麻豆国产一区| 欧美日韩在线二区| 在线国产日韩| 国产精品久久久久9999高清| av一区在线| 三级欧美在线一区| 国产激情久久| 久久国产中文字幕| 亚洲免费影院| 精品一区二区男人吃奶| 九九在线精品| 精品国产精品久久一区免费式| 美女网站一区| 鲁大师精品99久久久| 国产一区清纯| 中文视频一区| 99久久婷婷| 日韩高清不卡在线| 日韩在线欧美| 日韩欧美精品一区二区综合视频| 国产精品伦理久久久久久| 91精品一区二区三区综合在线爱| 日本不卡一二三区黄网| 老色鬼精品视频在线观看播放| 亚洲精品888| 国产伦乱精品| 成人国产精品一区二区免费麻豆| 久久国产福利| 久久中文精品| 一区二区三区国产盗摄| 久久久人人人| 亚洲精品一二| 久久精品二区三区| 国产精品久久久久久久久久久久久久久 | 日韩av有码| 亚洲性视频h| 国产精品yjizz视频网| 亚洲免费成人av在线| 五月激情久久| 日本a口亚洲| 黄色日韩精品| 国产福利电影在线播放| 日韩欧美美女在线观看| 激情欧美国产欧美| 精品久久精品| 7m精品国产导航在线| 国产传媒在线观看| 国产欧美日韩精品一区二区免费| 免费高潮视频95在线观看网站| 精品国产欧美| 日韩欧美高清一区二区三区| 香蕉国产精品| 午夜影院一区| 久久只有精品| 日韩精品亚洲专区| 99视频精品全国免费| 欧美日韩调教| 免费看日韩精品| 黄色不卡一区| 亚洲精品极品| 在线亚洲精品| 中文字幕色婷婷在线视频| 国产精品videosex极品| 97精品国产| 久久xxx视频| 日韩和欧美一区二区| 蜜桃av一区| 在线国产一区二区| 国产美女一区| 好吊视频一区二区三区四区| av中文字幕在线观看第一页| 欧美亚洲人成在线| 日本少妇一区二区| 红桃视频欧美| 国产精品久久久久久久免费观看| 国产精品成人一区二区不卡| 精品一区av| 国产精品hd| 国产精品白丝久久av网站| 欧美日韩四区| 在线精品视频在线观看高清| 欧美大黑bbbbbbbbb在线| 日韩午夜av在线| 欧美福利专区| 欧美在线首页| 欧美日本精品| 日本不卡高清| 国产美女撒尿一区二区| 国产精一区二区| 日韩欧美激情| 欧美日韩黄网站| 日韩福利在线观看| 国产乱子精品一区二区在线观看| 日韩精品一区二区三区av| 亚洲精品伊人| 911亚洲精品| 日韩不卡在线观看日韩不卡视频| 青青国产精品| 亚洲成人国产| 亚洲丝袜啪啪| 久久精品免费一区二区三区 | 亚洲v在线看| 日韩精品免费一区二区夜夜嗨| 久久精品伊人| 9色国产精品| 欧美国产一级| 欧美日韩网址| 香蕉久久久久久| 国产suv精品一区二区四区视频| 国产精品黄网站| 精品久久网站| 黄毛片在线观看| 免费不卡中文字幕在线| 久久精品国产999大香线蕉| 韩国一区二区三区视频| 国产激情在线播放| 国产精品原创| 日韩成人高清| 成人在线丰满少妇av| 日本黄色精品| 天堂资源在线亚洲| 免费精品视频最新在线| 亚洲精品欧美| 国产精品第一国产精品| 中文在线а√天堂| 国产99久久| 中文字幕日韩欧美精品高清在线| 国产极品模特精品一二| 蜜桃av在线播放| 国产成人77亚洲精品www| 婷婷丁香综合| 在线亚洲精品|