從 cron 說起
在 Unix-like 操作系統(tǒng)中,有一個(gè)大家都很熟悉的 cli 工具,它能夠來處理定時(shí)任務(wù),周期性任務(wù),這就是: cron。 你只需要簡單的語法控制就能實(shí)現(xiàn)任意【定時(shí)】的語義。用法上可以參考一下這個(gè) Crontab Guru Editor[1],做的非常精巧。
簡單說,每一個(gè)位都代表了一個(gè)時(shí)間維度,* 代表全集,所以,上面的語義是:在每天早上的4點(diǎn)05分觸發(fā)任務(wù)。
但 cron 畢竟只是一個(gè)操作系統(tǒng)級別的工具,如果定時(shí)任務(wù)失敗了,或者壓根沒啟動,cron 是沒法提醒開發(fā)者這一點(diǎn)的。并且,cron 和 正則表達(dá)式都有一種魔力,不知道大家是否感同身受,這里引用同事的一句名言:
這世界上有些語言非常相似: shell腳本, es查詢的那個(gè)dsl語言, 定時(shí)任務(wù)的crontab, 正則表達(dá)式. 他們相似就相似在每次要寫的時(shí)候基本都得重新現(xiàn)學(xué)一遍。
正巧,最近看到了 gron 這個(gè)開源項(xiàng)目,它是用 Golang 實(shí)現(xiàn)一個(gè)并發(fā)安全的定時(shí)任務(wù)庫。實(shí)現(xiàn)非常簡單精巧,代碼量也不多。今天我們就來一起結(jié)合源碼看一下,怎樣基于 Golang 的能力做出來一個(gè)【定時(shí)任務(wù)庫】。
gron
Gron provides a clear syntax for writing and deploying cron jobs.
gron[2] 是一個(gè)泰國小哥在 2016 年開源的作品,它的特點(diǎn)就在于非常簡單和清晰的語義來定義【定時(shí)任務(wù)】,你不用再去記 cron 的語法。我們來看下作為使用者怎樣上手。
首先,我們還是一個(gè) go get 安裝依賴:
$gogetgithub.com/roylee0704/gron
假設(shè)我們期望在【時(shí)機(jī)】到了以后,要做的工作是打印一個(gè)字符串,每一個(gè)小時(shí)執(zhí)行一次,我們就可以這樣:
packagemain
import(
"fmt"
"time"
"github.com/roylee0704/gron"
)
funcmain(){
c:=gron.New()
c.AddFunc(gron.Every(1*time.Hour),func(){
fmt.Println("runseveryhour.")
})
c.Start()
}
非常簡單,而且即便是在 c.Start
之后我們依然可以添加新的定時(shí)任務(wù)進(jìn)去。支持了很好的擴(kuò)展性。
定時(shí)參數(shù)
注意到我們調(diào)用 gron.New().AddFunc()
時(shí)傳入了一個(gè) gron.Every(1*time.Hour)
。
這里其實(shí)你可以傳入任何一個(gè) time.Duration,從而把調(diào)度間隔從 1 小時(shí)調(diào)整到 1 分鐘甚至 1 秒。
除此之外,gron 還很貼心地封裝了一個(gè) xtime
包用來把常見的 time.Duration 封裝起來,這里我們開箱即用。
import"github.com/roylee0704/gron/xtime"
gron.Every(1*xtime.Day)
gron.Every(1*xtime.Week)
很多時(shí)候我們不僅僅某個(gè)任務(wù)在當(dāng)天運(yùn)行,還希望是我們指定的時(shí)刻,而不是依賴程序啟動時(shí)間,機(jī)械地加 24 hour。gron 對此也做了很好的支持:
gron.Every(30*xtime.Day).At("00:00")
gron.Every(1*xtime.Week).At("23:59")
我們只需指定 At("hh:mm") 就可以實(shí)現(xiàn)在指定時(shí)間執(zhí)行。
源碼解析
這一節(jié)我們來看看 gron 的實(shí)現(xiàn)原理。
所謂定時(shí)任務(wù),其實(shí)包含兩個(gè)層面:
-
觸發(fā)器。即我們希望這個(gè)任務(wù)在什么時(shí)間點(diǎn),什么周期被觸發(fā);
-
任務(wù)。即我們在觸發(fā)之后,希望執(zhí)行的任務(wù),類比到我們上面示例的 fmt.Println。
對這兩個(gè)概念的封裝和擴(kuò)展是一個(gè)定時(shí)任務(wù)庫必須考慮的。
而同時(shí),我們是在 Golang 的協(xié)程上跑程序的,意味著這會是一個(gè)長期運(yùn)行的協(xié)程,否則你即便指定了【一個(gè)月后干XXX】這個(gè)任務(wù),程序兩天后掛了,也就無法實(shí)現(xiàn)你的訴求了。
所以,我們還希望有一個(gè) manager 的角色,來管理我們的一組【定時(shí)任務(wù)】,如何調(diào)度,什么時(shí)候啟動,怎么停止,啟動了以后還想加新任務(wù)是否支持。
Cron
在 gron 的體系里,Cron 對象(我們上面通過 gron.New 創(chuàng)建出來的)就是我們的 manager,而底層的一個(gè)個(gè)【定時(shí)任務(wù)】則對應(yīng)到 Cron 對象中的一個(gè)個(gè) Entry:
//Cronprovidesaconvenientinterfaceforschedulingjobsuchastoclean-up
//databaseentryeverymonth.
//
//Cronkeepstrackofanynumberofentries,invokingtheassociatedfuncas
//specifiedbytheschedule.Itmayalsobestarted,stoppedandtheentries
//maybeinspected.
typeCronstruct{
entries[]*Entry
runningbool
addchan*Entry
stopchanstruct{}
}
//NewinstantiatesnewCroninstantc.
funcNew()*Cron{
return&Cron{
stop:make(chanstruct{}),
add:make(chan*Entry),
}
}
-
entries 就是定時(shí)任務(wù)的核心能力,它記錄了一組【定時(shí)任務(wù)】;
-
running 用來標(biāo)識這個(gè) Cron 是否已經(jīng)啟動;
-
add 是一個(gè)channel,用來支持在 Cron 啟動后,新增的【定時(shí)任務(wù)】;
-
stop 同樣是個(gè)channel,注意到是空結(jié)構(gòu)體,用來控制 Cron 的停止。這個(gè)其實(shí)是經(jīng)典寫法了,對日常開發(fā)也有借鑒意義,我們待會兒會好好看一下。
我們觀察到,當(dāng)調(diào)用 gron.New()
方法后,得到的是一個(gè)指向 Cron 對象的指針。此時(shí)只是初始化了 stop 和 add 兩個(gè) channel,沒有啟動調(diào)度。
Entry
重頭戲來了,Cron 里面的 []* Entry 其實(shí)就代表了一組【定時(shí)任務(wù)】,每個(gè)【定時(shí)任務(wù)】可以簡化理解為 <觸發(fā)器,任務(wù)> 組成的一個(gè) tuple。
//Entryconsistsofascheduleandthejobtobeexecutedonthatschedule.
typeEntrystruct{
ScheduleSchedule
JobJob
//thenexttimethejobwillrun.ThisiszerotimeifCronhasnotbeen
//startedorinvalidschedule.
Nexttime.Time
//thelasttimethejobwasrun.Thisiszerotimeifthejobhasnotbeen
//run.
Prevtime.Time
}
//ScheduleistheinterfacethatwrapsthebasicNextmethod.
//
//Nextdeducesnextoccurringtimebasedontandunderlyingstates.
typeScheduleinterface{
Next(ttime.Time)time.Time
}
//JobistheinterfacethatwrapsthebasicRunmethod.
//
//Runexecutestheunderlyingfunc.
typeJobinterface{
Run()
}
-
Schedule 代表了一個(gè)【觸發(fā)器】,或者說一個(gè)定時(shí)策略。它只包含一個(gè)
Next
方法,接受一個(gè)時(shí)間點(diǎn),業(yè)務(wù)要返回下一次觸發(fā)調(diào)動的時(shí)間點(diǎn)。 -
Job 則是對【任務(wù)】的抽象,只需要實(shí)現(xiàn)一個(gè)
Run
方法,沒有入?yún)⒊鰠ⅰ?/p>
除了這兩個(gè)核心依賴外,Entry 結(jié)構(gòu)還包含了【前一次執(zhí)行時(shí)間點(diǎn)】和【下一次執(zhí)行時(shí)間點(diǎn)】,這個(gè)目前可以忽略,只是為了輔助代碼用。
按照時(shí)間排序
//byTimeisahandywrappertochronologicallysortentries.
typebyTime[]*Entry
func(bbyTime)Len()int{returnlen(b)}
func(bbyTime)Swap(i,jint){b[i],b[j]=b[j],b[i]}
//Lessreports`earliest`timeishouldsortbeforej.
//zerotimeisnot`earliest`time.
func(bbyTime)Less(i,jint)bool{
ifb[i].Next.IsZero(){
returnfalse
}
ifb[j].Next.IsZero(){
returntrue
}
returnb[i].Next.Before(b[j].Next)
}
這里是對 Entry 列表的簡單封裝,因?yàn)槲覀兛赡芡瑫r(shí)有多個(gè) Entry 需要調(diào)度,處理的順序很重要。這里實(shí)現(xiàn)了 sort 的接口, 有了 Len()
, Swap()
, Less()
我們就可以用 sort.Sort()
來排序了。
此處的排序策略是按照時(shí)間大小。
新增定時(shí)任務(wù)
我們在示例里面出現(xiàn)過調(diào)用 AddFunc() 來加入一個(gè) gron.Every(xxx) 這樣一個(gè)【定時(shí)任務(wù)】。其實(shí)這是給用戶提供的簡單封裝。
//JobFuncisanadaptertoallowtheuseofordinaryfunctionsasgron.Job
//Iffisafunctionwiththeappropriatesignature,JobFunc(f)isahandler
//thatcallsf.
//
//todo:possiblyfuncwithparams?maybenotneeded.
typeJobFuncfunc()
//Runcallsj()
func(jJobFunc)Run(){
j()
}
//AddFuncregisterstheJobfunctionforthegivenSchedule.
func(c*Cron)AddFunc(sSchedule,jfunc()){
c.Add(s,JobFunc(j))
}
//Addappendsschedule,jobtoentries.
//
//ifcroninstantisnotrunning,addingtoentriesistrivial.
//otherwise,topreventdata-race,addsthroughchannel.
func(c*Cron)Add(sSchedule,jJob){
entry:=&Entry{
Schedule:s,
Job:j,
}
if!c.running{
c.entries=append(c.entries,entry)
return
}
c.add<-?entry
}
JobFunc 實(shí)現(xiàn)了我們上一節(jié)提到的 Job 接口,基于此,我們就可以讓用戶直接傳入一個(gè) func()
就ok,內(nèi)部轉(zhuǎn)成 JobFunc,再利用通用的 Add 方法將其加入到 Cron 中即可。
注意,這里的 Add 方法就是新增定時(shí)任務(wù)的核心能力了,我們需要觸發(fā)器 Schedule,任務(wù) Job。并以此來構(gòu)造出一個(gè)定時(shí)任務(wù) Entry。
若 Cron 實(shí)例還沒啟動,加入到 Cron 的 entries 列表里就ok,隨后啟動的時(shí)候會處理。但如果已經(jīng)啟動了,就直接往 add 這個(gè) channel 中塞,走額外的新增調(diào)度路徑。
啟動和停止
//Startsignalscroninstantctogetupandrunning.
func(c*Cron)Start(){
c.running=true
goc.run()
}
//Stophaltscroninstantcfromrunning.
func(c*Cron)Stop(){
if!c.running{
return
}
c.running=false
c.stop<-?struct{}{}
}
我們先 high level 地看一下一個(gè) Cron 的啟動和停止。
-
Start 方法執(zhí)行的時(shí)候會先將 running 變量置為 true,用來標(biāo)識實(shí)例已經(jīng)啟動(啟動前后加入的定時(shí)任務(wù) Entry 處理策略是不同的,所以這里需要標(biāo)識),然后啟動一個(gè) goroutine 來實(shí)際跑啟動的邏輯。
-
Stop 方法則會將 running 置為 false,然后直接往 stop channel 塞一個(gè)空結(jié)構(gòu)體即可。
ok,有了這個(gè)心里預(yù)期,我們來看看 c.run()
里面干了什么事:
varafter=time.After
//runthescheduler...
//
//Itneedstobeprivateasit'sresponsibleofsynchronizingacritical
//sharedstate:`running`.
func(c*Cron)run(){
vareffectivetime.Time
now:=time.Now().Local()
//tofigurenexttrigtimeforentries,referencedfromnow
for_,e:=rangec.entries{
e.Next=e.Schedule.Next(now)
}
for{
sort.Sort(byTime(c.entries))
iflen(c.entries)>0{
effective=c.entries[0].Next
}else{
effective=now.AddDate(15,0,0)//topreventphantomjobs.
}
select{
casenow=<-after(effective.Sub(now)):
???//entrieswithsametimegetsrun.
for_,entry:=rangec.entries{
ifentry.Next!=effective{
break
}
entry.Prev=now
entry.Next=entry.Schedule.Next(now)
goentry.Job.Run()
}
casee:=<-c.add:
???e.Next?=?e.Schedule.Next(time.Now())
???c.entries?=?append(c.entries,e)
case<-c.stop:
???return//terminatego-routine.
}
}
}
重點(diǎn)來了,看看我們是如何把上面 Cron, Entry, Schedule, Job 串起來的。
- 首先拿到 local 的時(shí)間 now;
- 遍歷所有 Entry,調(diào)用 Next 方法拿到各個(gè)【定時(shí)任務(wù)】下一次運(yùn)行的時(shí)間點(diǎn);
- 對所有 Entry 按照時(shí)間排序(我們上面提過的 byTime);
- 拿到第一個(gè)要到期的時(shí)間點(diǎn),在 select 里面通過 time.After 來監(jiān)聽。到點(diǎn)了就起動新的 goroutine 跑對應(yīng) entry 里的 Job,并回到 for 循環(huán),繼續(xù)重新 sort,再走同樣的流程;
- 若 add channel 里有新的 Entry 被加進(jìn)來,就加入到 Cron 的 entries 里,觸發(fā)新的 sort;
- 若 stop channel 收到了信號,就直接 return,結(jié)束執(zhí)行。
整體實(shí)現(xiàn)還是非常簡潔的,大家可以感受一下。
Schedule
前面其實(shí)我們暫時(shí)將觸發(fā)器的復(fù)雜性封裝在 Schedule 接口中了,但怎么樣實(shí)現(xiàn)一個(gè) Schedule 呢?
尤其是注意,我們還支持 At 操作,也就是指定 Day,和具體的小時(shí),分鐘。回憶一下:
gron.Every(30*xtime.Day).At("00:00")
gron.Every(1*xtime.Week).At("23:59")
這一節(jié)我們就來看看,gron.Every 干了什么事,又是如何支持 At 方法的。
//EveryreturnsaSchedulereoccurseveryperiodp,pmustbeatleast
//time.Second.
funcEvery(ptime.Duration)AtSchedule{
ifp//truncatesuptoseconds
return&periodicSchedule{
period:p,
}
}
gron 的 Every 函數(shù)接受一個(gè) time.Duration,返回了一個(gè) AtSchedule 接口。我待會兒會看,這里注意,Every 里面是會把【秒】級以下給截掉。
我們先來看下,最后返回的這個(gè) periodicSchedule 是什么:
typeperiodicSchedulestruct{
periodtime.Duration
}
//Nextaddstimettounderlyingperiod,truncatesuptounitofseconds.
func(psperiodicSchedule)Next(ttime.Time)time.Time{
returnt.Truncate(time.Second).Add(ps.period)
}
//Atreturnsaschedulewhichreoccurseveryperiodp,attimet(hh:ss).
//
//Note:Atpanicswhenperiodpislessthanxtime.Day,anderrorhh:ssformat.
func(psperiodicSchedule)At(tstring)Schedule{
ifps.periodpanic("periodmustbeatleastindays")
}
//parsetnaively
h,m,err:=parse(t)
iferr!=nil{
panic(err.Error())
}
return&atSchedule{
period:ps.period,
hh:h,
mm:m,
}
}
//parsenaivelytokeniseshoursandminutes.
//
//returnserrorwheninputformatwasincorrect.
funcparse(hhmmstring)(hhint,mmint,errerror){
hh=int(hhmm[0]-'0')*10+int(hhmm[1]-'0')
mm=int(hhmm[3]-'0')*10+int(hhmm[4]-'0')
ifhh0||hh>24{
hh,mm=0,0
err=errors.New("invalidhhformat")
}
ifmm0||mm>59{
hh,mm=0,0
err=errors.New("invalidmmformat")
}
return
}
可以看到,所謂 periodicSchedule 就是一個(gè)【周期性觸發(fā)器】,只維護(hù)一個(gè) time.Duration 作為【周期】。
periodicSchedule 實(shí)現(xiàn) Next 的方式也很簡單,把秒以下的截掉之后,直接 Add(period)
,把周期加到當(dāng)前的 time.Time
上,返回新的時(shí)間點(diǎn)。這個(gè)大家都能想到。
重點(diǎn)在于,對 At 能力的支持。我們來關(guān)注下 func (ps periodicSchedule) At(t string) Schedule
這個(gè)方法
-
若周期連 1 天都不到,不支持 At 能力,因?yàn)?At 本質(zhì)是在選定的一天內(nèi),指定小時(shí),分鐘,作為輔助。連一天都不到的周期,是要精準(zhǔn)處理的;
-
將用戶輸入的形如 "23:59" 時(shí)間字符串解析出來【小時(shí)】和【分鐘】;
-
構(gòu)建出一個(gè) atSchedule 對象,包含了【周期時(shí)長】,【小時(shí)】,【分鐘】。
ok,這一步只是拿到了材料,那具體怎樣處理呢?這個(gè)還是得繼續(xù)往下走,看看 atSchedule 結(jié)構(gòu)干了什么:
typeatSchedulestruct{
periodtime.Duration
hhint
mmint
}
//resetreturnsnewDatebasedontimeinstantt,andreconfigureitshh:ss
//accordingtoatSchedule'shh:ss.
func(asatSchedule)reset(ttime.Time)time.Time{
returntime.Date(t.Year(),t.Month(),t.Day(),as.hh,as.mm,0,0,time.UTC)
}
//Nextreturns**next**time.
//iftpasseditssupposedschedule:reset(t),returnsreset(t)+period,
//elsereturnsreset(t).
func(asatSchedule)Next(ttime.Time)time.Time{
next:=as.reset(t)
ift.After(next){
returnnext.Add(as.period)
}
returnnext
}
其實(shí)只看這個(gè) Next 的實(shí)現(xiàn)即可。我們從 periodSchedule 那里獲取了三個(gè)屬性。
在調(diào)用 Next 方法時(shí),先做 reset,根據(jù)原有 time.Time 的年,月,日,以及用戶輸入的 At 中的小時(shí),分鐘,來構(gòu)建出來一個(gè) time.Time 作為新的時(shí)間點(diǎn)。
此后判斷是在哪個(gè)周期,如果當(dāng)前周期已經(jīng)過了,那就按照下個(gè)周期的時(shí)間點(diǎn)返回。
到這里,一切就都清楚了,如果我們不用 At 能力,直接 gron.Every(xxx)
,那么直接就會調(diào)用
t.Truncate(time.Second).Add(ps.period)
拿到一個(gè)新的時(shí)間點(diǎn)返回。
而如果我們要用 At 能力,指定當(dāng)天的小時(shí),分鐘。那就會走到 periodicSchedule.At
這里,解析出【小時(shí)】和【分鐘】,最后走 Next 返回 reset 之后的時(shí)間點(diǎn)。
這個(gè)和 gron.Every 方法返回的 AtSchedule
接口其實(shí)是完全對應(yīng)的:
//AtScheduleextendsSchedulebyenablingperiodic-interval&time-specificsetup
typeAtScheduleinterface{
At(tstring)Schedule
Schedule
}
直接就有一個(gè) Schedule 可以用,但如果你想針對天級以上的 duration 指定時(shí)間,也可以走 At 方法,也會返回一個(gè) Schedule 供我們使用。
擴(kuò)展性
gron 里面對于所有的依賴也都做成了【依賴接口而不是實(shí)現(xiàn)】。Cron 的 Add 函數(shù)的入?yún)⒁彩莾蓚€(gè)接口,這里可以隨意替換:func (c *Cron) Add(s Schedule, j Job)
。
最核心的兩個(gè)實(shí)體依賴 Schedule, Job 都可以用你自定義的實(shí)現(xiàn)來替換掉。
如實(shí)現(xiàn)一個(gè)新的 Job:
typeReminderstruct{
Msgstring
}
func(rReminder)Run(){
fmt.Println(r.Msg)
}
事實(shí)上,我們上面提到的 periodicSchedule
以及 atSchedule
就是 Schedule 接口的具體實(shí)現(xiàn)。我們也完全可以不用 gron.Every
,而是自己寫一套新的 Schedule 實(shí)現(xiàn)。只要實(shí)現(xiàn) Next(p time.Duration) time.Time
即可。
我們來看一個(gè)完整用法案例:
packagemain
import(
"fmt"
"github.com/roylee0704/gron"
"github.com/roylee0704/gron/xtime"
)
typePrintJobstruct{Msgstring}
func(pPrintJob)Run(){
fmt.Println(p.Msg)
}
funcmain(){
var(
//schedules
daily=gron.Every(1*xtime.Day)
weekly=gron.Every(1*xtime.Week)
monthly=gron.Every(30*xtime.Day)
yearly=gron.Every(365*xtime.Day)
//contrivedjobs
purgeTask=func(){fmt.Println("purgeagedrecords")}
printFoo=printJob{"Foo"}
printBar=printJob{"Bar"}
)
c:=gron.New()
c.Add(daily.At("12:30"),printFoo)
c.AddFunc(weekly,func(){fmt.Println("Everyweek")})
c.Start()
//JobsmayalsobeaddedtoarunningGron
c.Add(monthly,printBar)
c.AddFunc(yearly,purgeTask)
//StopGron(runningjobsarenothalted).
c.Stop()
}
經(jīng)典寫法-控制退出
這里我們還是要聊一下 Cron 里控制退出的經(jīng)典寫法。我們把其他不相關(guān)的部分清理掉,只留下核心代碼:
typeCronstruct{
stopchanstruct{}
}
func(c*Cron)Stop(){
c.stop<-?struct{}{}
}
func(c*Cron)run(){
for{
select{
case<-c.stop:
???return//terminatego-routine.
}
}
}
空結(jié)構(gòu)體能夠最大限度節(jié)省內(nèi)存,畢竟我們只是需要一個(gè)信號。核心邏輯用 for + select 的配合,這樣當(dāng)我們需要結(jié)束時(shí)可以立刻響應(yīng)。非常經(jīng)典,建議大家日常有需要的時(shí)候采用。
結(jié)語
gron 整體代碼其實(shí)只在 cron.go 和 schedule.go 兩個(gè)文件,合起來代碼不過 300 行,非常精巧,基本沒有冗余,擴(kuò)展性很好,是非常好的入門材料。
不過,作為一個(gè) cron 的替代品,其實(shí) gron 還是有自己的問題的。簡單講就是,如果我重啟了一個(gè)EC2實(shí)例,那么我的 cron job 其實(shí)也還會繼續(xù)執(zhí)行,這是落盤的,操作系統(tǒng)級別的支持。
但如果我執(zhí)行 gron 的進(jìn)程掛掉了,不好意思,那就完全涼了。你只有重啟,然后再把所有任務(wù)加回來才行。而我們既然要用 gron,是很有可能定一個(gè)幾天后,幾個(gè)星期后,幾個(gè)月后這樣的觸發(fā)器的。誰能保證進(jìn)程一直活著呢?連機(jī)子本身都可能重啟。
所以,我們需要一定的機(jī)制來保證 gron 任務(wù)的可恢復(fù)性,將任務(wù)落盤,持久化狀態(tài)信息,算是個(gè)思考題,這里大家可以考慮一下怎么做。
審核編輯 :李倩
-
操作系統(tǒng)
+關(guān)注
關(guān)注
37文章
6825瀏覽量
123331 -
代碼
+關(guān)注
關(guān)注
30文章
4788瀏覽量
68612
原文標(biāo)題:解析 Golang 定時(shí)任務(wù)庫 gron 設(shè)計(jì)和原理
文章出處:【微信號:LinuxHub,微信公眾號:Linux愛好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論