在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Python定時任務的實現方式

馬哥Linux運維 ? 來源:標點符 ? 作者:錢魏Way ? 2021-10-08 15:20 ? 次閱讀

在日常工作中,我們常常會用到需要周期性執行的任務,一種方式是采用 Linux 系統自帶的 crond 結合命令行實現。另外一種方式是直接使用Python。接下來整理的是常見的Python定時任務的實現方式。

目錄利用while True: + sleep()實現定時任務

使用Timeloop庫運行定時任務

利用threading.Timer實現定時任務

利用內置模塊sched實現定時任務

利用調度模塊schedule實現定時任務

利用任務框架APScheduler實現定時任務

Job 作業

Trigger 觸發器

Executor 執行器

Jobstore 作業存儲

Event 事件

調度器

APScheduler中的重要概念

Scheduler的工作流程

使用分布式消息系統Celery實現定時任務

使用數據流工具Apache Airflow實現定時任務

Airflow 產生的背景

Airflow 核心概念

Airflow 的架構

利用while True: + sleep()實現定時任務位于 time 模塊中的 sleep(secs) 函數,可以實現令當前執行的線程暫停 secs 秒后再繼續執行。所謂暫停,即令當前線程進入阻塞狀態,當達到 sleep() 函數規定的時間后,再由阻塞狀態轉為就緒狀態,等待 CPU 調度。

基于這樣的特性我們可以通過while死循環+sleep()的方式實現簡單的定時任務。

代碼示例:

import datetime

import time

def time_printer():

now = datetime.datetime.now()

ts = now.strftime(‘%Y-%m-%d %H:%M:%S’)

print(‘do func time :’, ts)

def loop_monitor():

while True:

time_printer()

time.sleep(5) # 暫停5秒

if __name__ == “__main__”:

loop_monitor()

主要缺點:

只能設定間隔,不能指定具體的時間,比如每天早上8:00

sleep 是一個阻塞函數,也就是說 sleep 這一段時間,程序什么也不能操作。

使用Timeloop庫運行定時任務Timeloop是一個庫,可用于運行多周期任務。這是一個簡單的庫,它使用decorator模式在線程中運行標記函數。

示例代碼:

import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

print “2s job current time : {}”.format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

print “5s job current time : {}”.format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

print “10s job current time : {}”.format(time.ctime())

利用threading.Timer實現定時任務threading 模塊中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執行,所以不存在等待順序執行問題。

Timer(interval, function, args=[ ], kwargs={ })

interval: 指定的時間

function: 要執行的方法

args/kwargs: 方法的參數

代碼示例:

備注:Timer只能執行一次,這里需要循環調用,否則只能執行一次

利用內置模塊sched實現定時任務sched模塊實現了一個通用事件調度器,在調度器類使用一個延遲函數等待特定的時間,執行任務。同時支持多線程應用程序,在每個任務執行后會立刻調用延時函數,以確保其他線程也能執行。

class sched.scheduler(timefunc, delayfunc)這個類定義了調度事件的通用接口,它需要外部傳入兩個參數,timefunc是一個沒有參數的返回時間類型數字的函數(常用使用的如time模塊里面的time),delayfunc應該是一個需要一個參數來調用、與timefunc的輸出兼容、并且作用為延遲多個時間單位的函數(常用的如time模塊的sleep)。

代碼示例:

import datetime

import time

import sched

def time_printer():

now = datetime.datetime.now()

ts = now.strftime(‘%Y-%m-%d %H:%M:%S’)

print(‘do func time :’, ts)

loop_monitor()

def loop_monitor():

s = sched.scheduler(time.time, time.sleep) # 生成調度器

s.enter(5, 1, time_printer, ())

s.run()

if __name__ == “__main__”:

loop_monitor()

scheduler對象主要方法:

enter(delay, priority, action, argument),安排一個事件來延遲delay個時間單位。

cancel(event):從隊列中刪除事件。如果事件不是當前隊列中的事件,則該方法將跑出一個ValueError。

run():運行所有預定的事件。這個函數將等待(使用傳遞給構造函數的delayfunc()函數),然后執行事件,直到不再有預定的事件。

個人點評:比threading.Timer更好,不需要循環調用。

利用調度模塊schedule實現定時任務schedule是一個第三方輕量級的任務調度模塊,可以按照秒,分,小時,日期或者自定義事件執行時間。schedule允許用戶使用簡單、人性化的語法以預定的時間間隔定期運行Python函數(或其它可調用函數)。

先來看代碼,是不是不看文檔就能明白什么意思?

import schedule

import time

def job():

print(“I‘m working.。.”)

schedule.every(10).seconds.do(job)

schedule.every(10).minutes.do(job)

schedule.every().hour.do(job)

schedule.every().day.at(“10:30”).do(job)

schedule.every(5).to(10).minutes.do(job)

schedule.every().monday.do(job)

schedule.every().wednesday.at(“13:15”).do(job)

schedule.every().minute.at(“:17”).do(job)

while True:

schedule.run_pending()

time.sleep(1)

裝飾器:通過 @repeat() 裝飾靜態方法

import time

from schedule import every, repeat, run_pending

@repeat(every().second)

def job():

print(’working.。.‘)

while True:

run_pending()

time.sleep(1)

傳遞參數:

import schedule

def greet(name):

print(’Hello‘, name)

schedule.every(2).seconds.do(greet, name=’Alice‘)

schedule.every(4).seconds.do(greet, name=’Bob‘)

while True:

schedule.run_pending()

裝飾器同樣能傳遞參數:

from schedule import every, repeat, run_pending

@repeat(every().second, ’World‘)

@repeat(every().minute, ’Mars‘)

def hello(planet):

print(’Hello‘, planet)

while True:

run_pending()

取消任務:

import schedule

i = 0

def some_task():

global i

i += 1

print(i)

if i == 10:

schedule.cancel_job(job)

print(’cancel job‘)

exit(0)

job = schedule.every().second.do(some_task)

while True:

schedule.run_pending()

運行一次任務:

import time

import schedule

def job_that_executes_once():

print(’Hello‘)

return schedule.CancelJob

schedule.every().minute.at(’:34‘).do(job_that_executes_once)

while True:

schedule.run_pending()

time.sleep(1)

根據標簽檢索任務:

# 檢索所有任務:schedule.get_jobs()

import schedule

def greet(name):

print(’Hello {}‘.format(name))

schedule.every().day.do(greet, ’Andrea‘).tag(’daily-tasks‘, ’friend‘)

schedule.every().hour.do(greet, ’John‘).tag(’hourly-tasks‘, ’friend‘)

schedule.every().hour.do(greet, ’Monica‘).tag(’hourly-tasks‘, ’customer‘)

schedule.every().day.do(greet, ’Derek‘).tag(’daily-tasks‘, ’guest‘)

friends = schedule.get_jobs(’friend‘)

print(friends)

根據標簽取消任務:

# 取消所有任務:schedule.clear()

import schedule

def greet(name):

print(’Hello {}‘.format(name))

if name == ’Cancel‘:

schedule.clear(’second-tasks‘)

print(’cancel second-tasks‘)

schedule.every().second.do(greet, ’Andrea‘).tag(’second-tasks‘, ’friend‘)

schedule.every().second.do(greet, ’John‘).tag(’second-tasks‘, ’friend‘)

schedule.every().hour.do(greet, ’Monica‘).tag(’hourly-tasks‘, ’customer‘)

schedule.every(5).seconds.do(greet, ’Cancel‘).tag(’daily-tasks‘, ’guest‘)

while True:

schedule.run_pending()

運行任務到某時間:

import schedule

from datetime import datetime, timedelta, time

def job():

print(’working.。.‘)

schedule.every().second.until(’23:59‘).do(job) # 今天23:59停止

schedule.every().second.until(’2030-01-01 18:30‘).do(job) # 2030-01-01 18:30停止

schedule.every().second.until(timedelta(hours=8)).do(job) # 8小時后停止

schedule.every().second.until(time(23, 59, 59)).do(job) # 今天2359停止

schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止

while True:

schedule.run_pending()

馬上運行所有任務(主要用于測試):

import schedule

def job():

print(’working.。.‘)

def job1():

print(’Hello.。.‘)

schedule.every().monday.at(’12:40‘).do(job)

schedule.every().tuesday.at(’16:40‘).do(job1)

schedule.run_all()

schedule.run_all(delay_seconds=3) # 任務間延遲3秒

并行運行:使用 Python 內置隊列實現:

import threading

import time

import schedule

def job1():

print(“I’m running on thread %s” % threading.current_thread())

def job2():

print(“I‘m running on thread %s” % threading.current_thread())

def job3():

print(“I’m running on thread %s” % threading.current_thread())

def run_threaded(job_func):

job_thread = threading.Thread(target=job_func)

job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)

schedule.every(10).seconds.do(run_threaded, job2)

schedule.every(10).seconds.do(run_threaded, job3)

while True:

schedule.run_pending()

time.sleep(1)

利用任務框架APScheduler實現定時任務APScheduler(advanceded python scheduler)基于Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基于日期、固定時間間隔以及crontab類型的任務,并且可以持久化任務?;谶@些功能,我們可以很方便的實現一個Python定時任務系統。

它有以下三個特點:

類似于 Liunx Cron 的調度程序(可選的開始/結束時間)

基于時間間隔的執行調度(周期性調度,可選的開始/結束時間)

一次性執行任務(在設定的日期/時間運行一次任務)

APScheduler有四種組成部分:

觸發器(trigger) 包含調度邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。

作業存儲(job store) 存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,并在加載時被反序列化。調度器不能分享同一個作業存儲。

執行器(executor) 處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。

調度器(scheduler) 是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。通過配置executor、jobstore、trigger,使用線程池(ThreadPoolExecutor默認值20)或進程池(ProcessPoolExecutor 默認值5)并且默認最多3個(max_instances)任務實例同時運行,實現對job的增刪改查等調度控制

示例代碼:

from apscheduler.schedulers.blocking import BlockingScheduler

from datetime import datetime

# 輸出時間

def job():

print(datetime.now().strftime(“%Y-%m-%d %H:%M:%S”))

# BlockingScheduler

sched = BlockingScheduler()

sched.add_job(my_job, ‘interval’, seconds=5, id=‘my_job_id’)

sched.start()

APScheduler中的重要概念

Job 作業

Job作為APScheduler最小執行單位。創建Job時指定執行的函數,函數中所需參數,Job執行時的一些設置信息。

構建說明:

id:指定作業的唯一ID

name:指定作業的名字

trigger:apscheduler定義的觸發器,用于確定Job的執行時間,根據設置的trigger規則,計算得到下次執行此job的時間, 滿足時將會執行

executor:apscheduler定義的執行器,job創建時設置執行器的名字,根據字符串你名字到scheduler獲取到執行此job的 執行器,執行job指定的函數

max_instances:執行此job的最大實例數,executor執行job時,根據job的id來計算執行次數,根據設置的最大實例數來確定是否可執行

next_run_time:Job下次的執行時間,創建Job時可以指定一個時間[datetime],不指定的話則默認根據trigger獲取觸發時間

misfire_grace_time:Job的延遲執行時間,例如Job的計劃執行時間是2100,但因服務重啟或其他原因導致2131才執行,如果設置此key為40,則該job會繼續執行,否則將會丟棄此job

coalesce:Job是否合并執行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發器設置為5s執行一次,因此此job錯過了4個執行時間,如果設置為是,則會合并到一次執行,否則會逐個執行

func:Job執行的函數

args:Job執行函數需要的位置參數

kwargs:Job執行函數需要的關鍵字參數

Trigger 觸發器

Trigger綁定到Job,在scheduler調度篩選Job時,根據觸發器的規則計算出Job的觸發時間,然后與當前時間比較確定此Job是否會被執行,總之就是根據trigger規則計算出下一個執行時間。

目前APScheduler支持觸發器:

指定時間的DateTrigger

指定間隔時間的IntervalTrigger

像Linux的crontab一樣的CronTrigger。

觸發器參數:date

date定時,作業只執行一次。

run_date (datetime|str) – the date/time to run the job at

timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

sched.add_job(my_job, ‘date’, run_date=date(2009, 11, 6), args=[‘text’])

sched.add_job(my_job, ‘date’, run_date=datetime(2019, 7, 6, 16, 30, 5), args=[‘text’])

觸發器參數:interval

interval間隔調度

weeks (int) – 間隔幾周

days (int) – 間隔幾天

hours (int) – 間隔幾小時

minutes (int) – 間隔幾分鐘

seconds (int) – 間隔多少秒

start_date (datetime|str) – 開始日期

end_date (datetime|str) – 結束日期

timezone (datetime.tzinfo|str) – 時區

sched.add_job(job_function, ‘interval’, hours=2)

觸發器參數:cron

cron調度

(int|str) 表示參數既可以是int類型,也可以是str類型

(datetime | str) 表示參數既可以是datetime類型,也可以是str類型

year (int|str) – 4-digit year -(表示四位數的年份,如2008年)

month (int|str) – month (1-12) -(表示取值范圍為1-12月)

day (int|str) – day of the (1-31) -(表示取值范圍為1-31日)

week (int|str) – ISO week (1-53) -(格里歷2006年12月31日可以寫成2006年-W52-7(擴展形式)或2006W527(緊湊形式))

day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第幾天,既可以用0-6表示也可以用其英語縮寫表示)

hour (int|str) – hour (0-23) – (表示取值范圍為0-23時)

minute (int|str) – minute (0-59) – (表示取值范圍為0-59分)

second (int|str) – second (0-59) – (表示取值范圍為0-59秒)

start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開始時間)

end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結束時間)

timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時區取值)

CronTrigger可用的表達式:

表達式參數類型描述

*所有通配符。例:minutes=*即每分鐘觸發

* / a所有每隔時長a執行一次。例:minutes=”* / 3″ 即每隔3分鐘執行一次

a – b所有a – b的范圍內觸發。例:minutes=“2-5”。即2到5分鐘內每分鐘執行一次

a – b / c所有a – b范圍內,每隔時長c執行一次。

xth y日第幾個星期幾觸發。x為第幾個,y為星期幾

last x日一個月中,最后一個星期的星期幾觸發

last日一個月中的最后一天觸發

x, y, z所有組合表達式,可以組合確定值或上述表達式

# 6-8,11-12月第三個周五 00:00, 01:00, 02:00, 03:00運行

sched.add_job(job_function, ‘cron’, month=‘6-8,11-12’, day=‘3rd fri’, hour=‘0-3’)

# 每周一到周五運行 直到2024-05-30 0000

sched.add_job(job_function, ‘cron’, day_of_week=‘mon-fri’, hour=5, minute=30, end_date=‘2024-05-30’

Executor 執行器

Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態添加Executor。每個executor都會綁定一個alias,這個作為唯一標識綁定到Job,在實際執行時會根據Job綁定的executor找到實際的執行器對象,然后根據執行器對象執行Job。

Executor的種類會根據不同的調度來選擇,如果選擇AsyncIO作為調度的庫,那么選擇AsyncIOExecutor,如果選擇tornado作為調度的庫,選擇TornadoExecutor,如果選擇啟動進程作為調度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。

Executor的選擇需要根據實際的scheduler來選擇不同的執行器。目前APScheduler支持的Executor:

executors.asyncio:同步io,阻塞

executors.gevent:io多路復用,非阻塞

executors.pool: 線程ThreadPoolExecutor和進程ProcessPoolExecutor

executors.twisted:基于事件驅動

Jobstore 作業存儲

Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態添加Jobstore。每個jobstore都會綁定一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,并將job添加到jobstore中。作業存儲器決定任務的保存方式, 默認存儲在內存中(MemoryJobStore),重啟后就沒有了。APScheduler支持的任務存儲器有:

jobstores.memory:內存

jobstores.mongodb:存儲在mongodb

jobstores.redis:存儲在redis

jobstores.rethinkdb:存儲在rethinkdb

jobstores.sqlalchemy:支持sqlalchemy的數據庫如mysql,sqlite等

jobstores.zookeeper:zookeeper

不同的任務存儲器可以在調度器的配置中進行配置(見調度器)

Event 事件

Event是APScheduler在進行某些操作時觸發相應的事件,用戶可以自定義一些函數來監聽這些事件,當觸發某些Event時,做一些具體的操作。常見的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時間錯過事件 EVENT_JOB_MISSED。

目前APScheduler定義的Event:

EVENT_SCHEDULER_STARTED

EVENT_SCHEDULER_START

EVENT_SCHEDULER_SHUTDOWN

EVENT_SCHEDULER_PAUSED

EVENT_SCHEDULER_RESUMED

EVENT_EXECUTOR_ADDED

EVENT_EXECUTOR_REMOVED

EVENT_JOBSTORE_ADDED

EVENT_JOBSTORE_REMOVED

EVENT_ALL_JOBS_REMOVED

EVENT_JOB_ADDED

EVENT_JOB_REMOVED

EVENT_JOB_MODIFIED

EVENT_JOB_EXECUTED

EVENT_JOB_ERROR

EVENT_JOB_MISSED

EVENT_JOB_SUBMITTED

EVENT_JOB_MAX_INSTANCES

Listener表示用戶自定義監聽的一些Event,比如當Job觸發了EVENT_JOB_MISSED事件時可以根據需求做一些其他處理。

調度器

Scheduler是APScheduler的核心,所有相關組件通過其定義。scheduler啟動之后,將開始按照配置的任務進行調度。除了依據所有定義Job的trigger生成的將要調度時間喚醒調度之外。當發生Job信息變更時也會觸發調度。

APScheduler支持的調度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

BlockingScheduler:適用于調度程序是進程中唯一運行的進程,調用start函數會阻塞當前線程,不能立即返回。

BackgroundScheduler:適用于調度程序在應用程序的后臺運行,調用start后主線程不會阻塞。

AsyncIOScheduler:適用于使用了asyncio模塊的應用程序。

GeventScheduler:適用于使用gevent模塊的應用程序。

TwistedScheduler:適用于構建Twisted的應用程序。

QtScheduler:適用于構建Qt的應用程序。

Scheduler的工作流程

Scheduler添加job流程:

Scheduler調度流程:

使用分布式消息系統Celery實現定時任務Celery是一個簡單,靈活,可靠的分布式系統,用于處理大量消息,同時為操作提供維護此類系統所需的工具, 也可用于任務調度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是一個好選擇。

Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。異步任務比如是發送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。

需要注意,celery本身并不具備任務的存儲功能,在調度任務的時候肯定是要把任務存起來的,因此在使用celery的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis緩存、數據庫等。官方推薦的是消息隊列RabbitMQ,有些時候使用Redis也是不錯的選擇。

它的架構組成如下圖:

Celery架構,它采用典型的生產者-消費者模式,主要由以下部分組成:

Celery Beat,任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。

Producer:需要在隊列中進行的任務,一般由用戶、觸發器或其他操作將任務入隊,然后交由workers進行處理。調用了Celery提供的API、函數或者裝飾器而產生任務并交給任務隊列處理的都是任務生產者。

Broker,即消息中間件,在這指任務隊列本身,Celery扮演生產者和消費者的角色,brokers就是生產者和消費者存放/獲取產品的地方(隊列)。

Celery Worker,執行任務的消費者,從隊列中取出任務并執行。通常會在多臺服務器運行多個消費者來提高執行效率。

Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

實際應用中,用戶從Web前端發起一個請求,我們只需要將請求所要處理的任務丟入任務隊列broker中,由空閑的worker去處理任務即可,處理的結果會暫存在后臺數據庫backend中。我們可以在一臺機器或多臺機器上同時起多個worker進程來實現分布式地并行處理任務。

Celery定時任務實例:

Python Celery & RabbitMQ Tutorial

Celery 配置實踐筆記

使用數據流工具Apache Airflow實現定時任務Apache Airflow 是Airbnb開源的一款數據流程工具,目前是Apache孵化項目。以非常靈活的方式來支持數據的ETL過程,同時還支持非常多的插件來完成諸如HDFS監控、郵件通知等功能。Airflow支持單機和分布式兩種模式,支持Master-Slave模式,支持Mesos等資源調度,有非常好的擴展性。被大量公司采用。

Airflow使用Python開發,它通過DAGs(Directed Acyclic Graph, 有向無環圖)來表達一個工作流中所要執行的任務,以及任務之間的關系和依賴。比如,如下的工作流中,任務T1執行完成,T2和T3才能開始執行,T2和T3都執行完成,T4才能開始執行。

Airflow提供了各種Operator實現,可以完成各種任務實現:

BashOperator – 執行 bash 命令或腳本。

SSHOperator – 執行遠程 bash 命令或腳本(原理同 paramiko 模塊)。

PythonOperator – 執行 Python 函數。

EmailOperator – 發送 Email。

HTTPOperator – 發送一個 HTTP 請求。

MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執行 SQL 任務。

DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。

一些情況下,我們需要根據執行結果執行不同的任務,這樣工作流會產生分支。如:

這種需求可以使用BranchPythonOperator來實現。

Airflow 產生的背景

通常,在一個運維系統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限于:

時間依賴:任務需要等待某一個時間點觸發。

外部系統依賴:任務依賴外部系統需要調用接口去訪問。

任務間依賴:任務 A 需要在任務 B 完成后啟動,兩個任務互相間會產生影響。

資源環境依賴:任務消耗資源非常多, 或者只能在特定的機器上執行。

crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。

Airflow 是一種 WMS,即:它將任務以及它們的依賴看作代碼,按照那些計劃規范任務執行,并在實際工作進程之間分發需執行的任務。

Airflow 提供了一個用于顯示當前活動任務和過去任務狀態的優秀 UI,并允許用戶手動管理任務的執行和狀態。

Airflow 中的工作流是具有方向性依賴的任務集合。

DAG 中的每個節點都是一個任務,DAG 中的邊表示的是任務之間的依賴(強制為有向無環,因此不會出現循環依賴,從而導致無限執行循環)。

Airflow 核心概念

DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關系組織起來,描述的是所有tasks執行順序。

Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內置了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用于發送郵件,HTTPOperator 用于發送HTTP請求, SqlOperator 用于執行SQL命令等等,同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。

Tasks:Task 是 Operator的一個實例,也就是DAGs中的一個node。

Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。

Task Relationships:DAGs中的不同Tasks之間可以有依賴關系,如 Task1 》》 Task2,表明Task2依賴于Task2了。通過將DAGs和Operators結合起來,用戶就可以創建各種復雜的 工作流(workflow)。

Airflow 的架構

在一個可擴展的生產環境中,Airflow 含有以下組件:

元數據庫:這個數據庫存儲有關任務狀態的信息。

調度器:Scheduler 是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。調度器通常作為服務運行。

執行器:Executor 是一個消息隊列進程,它被綁定到調度器中,用于確定實際執行每個任務計劃的工作進程。有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。例如,LocalExecutor 使用與調度器進程在同一臺機器上運行的并行進程執行任務。其他像 CeleryExecutor 的執行器使用存在于獨立的工作機器集群中的工作進程執行任務。

Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器確定。

Worker的具體實現由配置文件中的executor來指定,airflow支持多種Executor:

SequentialExecutor: 單進程順序執行,一般只用來測試

LocalExecutor: 本地多進程執行

CeleryExecutor: 使用Celery進行分布式任務調度

DaskExecutor:使用Dask進行分布式任務調度

KubernetesExecutor: 1.10.0新增, 創建臨時POD執行每次任務

生產環境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架構如圖:

其它參考:

Getting started with Apache Airflow

Understanding Apache Airflow’s key concepts

原文鏈接:https://www.biaodianfu.com/python-schedule.html

責任編輯:haq

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • Linux
    +關注

    關注

    87

    文章

    11304

    瀏覽量

    209483
  • python
    +關注

    關注

    56

    文章

    4797

    瀏覽量

    84683

原文標題:Python 實現定時任務的八種方案!

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    Linux計劃任務介紹

    1.計劃任務定時任務)基本概述 1.什么是crond crond就是計劃任務,類似于我們平時生活中的鬧鐘。定點執行。 2.為什么要使用crond?crond主要是做一些周期性的任務
    的頭像 發表于 11-24 15:49 ?287次閱讀

    定時器技術:Air780E如何革新定時任務管理?

    今天講的是關于Air780E如何革新定時任務管理的內容,希望大家有所收獲。
    的頭像 發表于 11-07 13:50 ?232次閱讀
    <b class='flag-5'>定時</b>器技術:Air780E如何革新<b class='flag-5'>定時任務</b>管理?

    mysql定時備份任務

    在生產環境上,為了避免數據的丟失,通常情況下都會定時的對數據庫進行備份。而Linux的crontab指令則可以幫助我們實現對數據庫定時進行備份。首先我們來簡單了解crontab指令,如果你會了請跳到下一個內容mysql備份。
    的頭像 發表于 10-31 10:07 ?164次閱讀

    linux定時任務的用法總結

    習慣了使用 windows 的計劃任務,使用 linux 中的 crontab 管理定時任務時很不適應。
    的頭像 發表于 08-14 18:16 ?850次閱讀
    linux<b class='flag-5'>定時任務</b>的用法總結

    ESP8266如何實現時間小于3us的定時任務

    實現一個穩定的軟串口,現有的軟串口程序是通過中斷實現的,但中斷好像會被其他中斷打斷,導致數據丟失,定時器按文檔上的說法,只能大于50us,能不能實現時間小于3us的
    發表于 07-19 06:13

    如何實現Python復制文件操作

    Python 中有許多“開蓋即食”的模塊(比如 os,subprocess 和 shutil)以支持文件 I/O 操作。在這篇文章中,你將會看到一些用 Python 實現文件復制的特殊方法。下面我們開始學習這九種不同的方法來
    的頭像 發表于 07-18 14:53 ?422次閱讀

    智能插座“云”時代:定時任務與事件驅動的創新管理

    用戶可以通過云端界面,在任何時間任何地點對插座進行配置和監控,同時收集數據和洞察分析,以促進能效最優化。無論是確保家中的咖啡機在你醒來之前準備好早晨的咖啡,還是遠程調整辦公室的溫度設置以節約能源,智能插座配合云管理打開了便捷與高效的大門。
    的頭像 發表于 07-15 18:16 ?975次閱讀
    智能插座“云”時代:<b class='flag-5'>定時任務</b>與事件驅動的創新管理

    定時器的工作方式介紹

    實現周期性事件的硬件模塊。它可以用于實現各種定時任務,如定時中斷、PWM(脈沖寬度調制)輸出、頻率測量等。定時器通常由一個計數器、一個時鐘
    的頭像 發表于 07-12 10:29 ?952次閱讀

    使用Python進行自然語言處理

    在探討使用Python進行自然語言處理(NLP)的廣闊領域時,我們首先需要理解NLP的基本概念、其重要性、Python在NLP中的優勢,以及如何通過Python實現一些基礎的NLP
    的頭像 發表于 07-04 14:40 ?455次閱讀

    長持續時間定時器電路圖 時間定時器的工作原理和功能

    的處理,都離不開定時器的精確控制。時間定時器通常由硬件和軟件兩部分組成,硬件部分通過計時器芯片或計數器來實現時間的度量和計算,而軟件部分則是通過編程語言提供的函數或類庫來設置和處理定時任務
    的頭像 發表于 06-24 17:34 ?1894次閱讀
    長持續時間<b class='flag-5'>定時</b>器電路圖 時間<b class='flag-5'>定時</b>器的工作原理和功能

    在物通博聯工業智能網關的本地配置界面(WEB)直接配置定時控制任務

    開關,可實現全年定時任務自動執行。在多個任務日期重疊時,可選執行高等級的還是并行執行。設有一個遠程和本地的控制點,默認為本地狀態,網關自己執行設置的任務,當用戶將改控制點切換為遠程時可
    的頭像 發表于 04-24 17:21 ?537次閱讀
    在物通博聯工業智能網關的本地配置界面(WEB)直接配置<b class='flag-5'>定時</b>控制<b class='flag-5'>任務</b>

    java實現多線程的幾種方式

    Java實現多線程的幾種方式 多線程是指程序中包含了兩個或以上的線程,每個線程都可以并行執行不同的任務或操作。Java中的多線程可以提高程序的效率和性能,使得程序可以同時處理多個任務。
    的頭像 發表于 03-14 16:55 ?708次閱讀

    使用TC21x的GPT實現1m計時器執行定時任務,怎么配置GTM和GPT?

    專家們好,我想使用TC21x的GPT實現1m計時器執行定時任務,不知道怎么配置GTM和GPT?
    發表于 02-06 06:47

    鴻蒙原生應用/元服務開發-長時任務

    概述 功能介紹 應用退至后臺后,對于在后臺需要長時間運行用戶可感知的任務,例如播放音樂、導航等。為防止應用進程被掛起,導致對應功能異常,可以申請長時任務,使應用在后臺長時間運行。申請長時任務后,系統
    發表于 01-09 10:52

    任務調度系統設計的核心邏輯

    Redis的讀寫性能極好,分布式鎖也比Quartz數據庫行級鎖更輕量級。當然Redis鎖也可以替換成Zookeeper鎖,也是同樣的機制。 在小型項目中,使用:定時任務框架(Quartz/Spring Schedule)和 分布式鎖(redis/zookeeper)有不錯的效果。
    的頭像 發表于 01-02 15:09 ?884次閱讀
    <b class='flag-5'>任務</b>調度系統設計的核心邏輯
    主站蜘蛛池模板: 高清一级做a爱免费视| 18美女扒开尿口无遮挡| 成年1314在线观看| 亚洲国产视频网| 一区二区三区四区国产精品| 亚洲综合久久久| 日本午夜片| 黄色生活毛片| 国产中文99视频在线观看| 91视频www| 日本最顶级丰满的aⅴ艳星| 国内一级毛片| 爱爱视频天天干| 无遮挡很爽很污很黄很色的网站 | 国模一区二区三区私啪啪| 黄色三级网站| 轻点灬大ji巴太粗太长了爽文| 日本怡红| 免费在线观看a视频| 99久久99| 成人a毛片在线看免费全部播放| 成人a一级毛片免费看| 日本人xxxxxxxxxⅹ69| 在线播放12p| 免费网站日本| 九色视频网| 五月婷婷六月合| 新午夜影院| 黄网站色视频免费观看| 日本久本草精品| 欧美黄又粗暴一进一出抽搐| 亚洲精品福利你懂| 国模私拍视频| 好大好硬好深好爽视频h | 亚洲视频五区| 久久狠狠色噜噜狠狠狠狠97 | 苦瓜se影院在线视频网站| 亚洲色吧| 天天综合色天天综合| xx日本69| 久久香蕉综合色一综合色88|