workflow是搜狗開源的一個開發框架。可以滿足絕大多數日常服務器開發,性能優異,給上層業務提供了易于開發的接口,卻只用了少量的代碼,舉重若輕,而且代碼整潔干凈易讀。
搜狗官方宣傳強調,workflow是一個異步任務調度編程范式,封裝了6種異步資源:CPU計算、GPU計算、網絡、磁盤I/O、定時器、計數器,以回調函數模式提供給用戶使用,概括起來實際上主要是兩個功能:1、屏蔽阻塞調用的影響,使阻塞調用的開發接口變為異步的,充分利用計算資源;2、框架管理線程池,使開發者迅速構建并行計算程序。
往往單臺機器要服務于千千萬萬終端,我們最希望服務器資源都能充分利用,然而計算資源和I/O資源天然的效率不對等,使我們不得不采用一些其他技術手段實現基礎資源充分利用。所謂I/O資源包括文件I/O和網絡I/O,此外很多時候我們需要定時執行某段邏輯,同樣不希望等待時間阻塞計算資源的使用。
所以框架最基礎的功能,是要為上層開發人員屏蔽底層資源的不對稱,使我們可以方便的開發業務邏輯而不需要把很多精力放在底層。
如何擬合計算資源和io資源
我們希望io等待或其他阻塞的時間,cpu還能充分利用,執行一些任務。這要求發起io的線程不能調用阻塞接口原地等待,而是要切出去,往往采用I/O多路復用或者異步I/O的方式,分別對應reactor模型和proactor模型
對于網絡I/O,linux系統下缺乏對異步I/O的支持,即使近兩年有了iouring,支持了異步io,但性能上相對epoll未必會有多少提升,而且一切都交給系統調度,可控性上大大降低;另外開發難度也更大。反觀epoll,無論系統的支持還是相關設計模型都非常成熟了,所以近一二十年底層大都采用epoll,以reactor模式實現,reactor統一處理請求,將就緒的任務轉給下游的處理器。根據業務不同,又有幾種不同實現方式,有的就單線程之內調度,單線程循環處理(如redis),適合業務邏輯不復雜的場景;有的會單reactor處理請求,并通過消息隊列把請求轉發給下游多線程業務邏輯處理器處理;有的多線程多reactor處理請求,并通過消息隊列將任務分發給下游handler,單reactor模式可以認為是這種模式的特例,workflow便以這種方式實現。
對于文件I/O,linux下有兩種異步I/O的支持,posix aio(glibcaio)和linux 原生 aio,其中前者是一個通過多線程的異步,模擬的異步io,性能極差;linux 原生 aio是真正的aio,但是要求fd只能以O_DIRECT方式打開,所以只適用于文件I/O,workflow中支持了這種方式處理文件I/O。
對于定時器,常見的方式,有的通過epoll每次阻塞設置阻塞時間,用戶態管理定時器(如redis);而epoll也支持時間事件,有的直接使用時間事件,workflow便采用這種方式。
提供給用戶的接口
計算資源得以充分利用,還需要考慮給用戶提供什么樣的接口,讓上層開發者能減少心智負擔,比如,以協程的方式,讓用戶像開發串行程序一樣開發異步程序,順序的寫邏輯;亦或者是提供讓用戶注冊回調的方式開發異步程序。workflow中提出了子任務的概念,以任務的方式提供給用戶。
子任務定義了一種管理回調的方式,用串行并行來組織子任務調度。用戶可以把邏輯寫在任務里,交給框架去調度。
把阻塞的任務交給epoll去異步調用,計算任務交給線程池去異步執行,以至于所有的任務都是異步調起的,這種設計思想,就是workflow被稱為“異步任務調度框架”的原因。
代碼分析
根據上面的分析,對一般服務器框架結構已經有了一個整體認識。下面按這個順序,底層基礎數據結構——》純計算任務和Reactor層——》任務組織調度層——》用戶接口層,分四個層次逐步分析一下workflow的實現。
基礎數據結構
workflow使用到的基礎數據結構:鏈表、紅黑樹、消息隊列、線程池,workflow中這四個結構的設計都非常的精致。
鏈表(見文件 list.h)
workflow中的鏈表貌似引自linux內核,實現了一種非常非常靈活的鏈表,甚至鏈表串起的不同節點之間可以是不同的數據結構
一般來說一個普通的鏈表節點如下:
{
ListNode * prev_ = nullptr;
ListNode * next_ = nullptr;
void * p_value_ = nullptr;
};
定義節點時定義好數據段p_value_,這樣的話數據結構的實現就會與業務邏輯結合在一起。
這里不使用模板也實現了預定義獨立于業務邏輯的鏈表數據結構。
鏈表的節點:// 這是一個雙鏈表
list_head *next, *prev;
};
可以把鏈表嵌入到任何一個數據結構中,
那如何通過鏈表節點拿到當前所在結構呢?
通過一個宏來實現:
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
簡單解釋下這個宏:ptr表示鏈表節點指針,type是當前節點數據結構類型名,member是鏈表節點在數據結構中的成員名
&((type *)0)->member)把指向地址空間起點的指針(空指針)轉化成指向節點數據結構的指針,然后取鏈表節點成員名,再取地址,就可以取到鏈表節點在這個數據結構中的偏移量。
ptr是鏈表節點指針,按(char *)減去偏移量,就可以回退到結構起始位置。再把這個位置轉化成(type *).就取到了指向當前數據結構的指針。
看接口甚至可以發現,當我想把當前數據結構從鏈表里刪除的時候,甚至不需要拿到鏈表,而是直接通過list_del(list_head * current_node)函數傳入當前節點就可以刪除,靈活的一塌糊涂。
并且提供了遍歷鏈表的接口宏:
for (pos = (head)->next; pos != (head); pos = pos->next)
每一行代碼都極其簡潔干凈,妙到毫巔!
其他鏈表基礎知識不多贅述。
紅黑樹(見rbtree.h/.c)
與鏈表類似,紅黑樹也使用了內核紅黑樹。
相同的風格,每個節點只有鏈接指針和節點顏色字段,而沒有數據。
{
struct rb_node *rb_parent;
struct rb_node *rb_right;
struct rb_node *rb_left;
char rb_color;
#define RB_RED 0
#define RB_BLACK 1
};
當把紅黑樹node嵌入數據結構中之后,使用同樣原理的宏,來獲取節點所在結構的指針:
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
比較特別的是,由于節點不包含數據,數據結構不知道節點之間如何比較大小,所以需要用戶自己定義查找、插入函數,但給出了例子。
消息隊列(見msgqueue.h/.c)
這里實現了一個消息隊列,也是正常的提供一個put接口,供生產者reactor生產數據插入消息,一個get接口,傳遞給下游handler消費,消息隊列有消息上限,并提供阻塞和非阻塞兩種模式,阻塞模式下,當消息超過上限生產線成阻塞,等待消息小于上限了再插入。通過條件變量使沒有待處理的消息時,阻塞消費線程,于內核態等待消息出現。這里的生產者和消費者都是多線程的,所以需要考慮線程安全,消息隊列的常見實現是一個數據存儲段,一個鎖,一個條件變量,而workflow中的消息隊列的高妙之處就在于,他有兩個鎖,兩個條件變量,兩個數據空間,雙倍快樂。
{
size_t msg_max;
size_t msg_cnt;
int linkoff;
int nonblock;
void *head1;
void *head2;
void **get_head;
void **put_head;
void **put_tail;
pthread_mutex_t get_mutex;
pthread_mutex_t put_mutex;
pthread_cond_t get_cond;
pthread_cond_t put_cond;
};
這里使用了一個小技巧,大幅提升消息隊列性能,兩個數據段一個專門用來get,一個專門用來put,兩把鎖兩個條件變量,分別put時候和get時候使用。這樣的好處就是get和put操作之間幾乎互不干擾。put操作不會鎖消費線程。get操作絕大多數情況下不會鎖生產線程。
只有當get鏈表為空時,才會把put和get全鎖住,對兩個鏈表頭進行交換,極大的減少了生產線程和消費線程之間爭奪鎖產生的相互影響。
這里還有一個點就是消息隊列要求節點是自帶鏈表字段的,并指定鏈接節點相對于結構頭的偏移量(linkoff)。所以插進來的節點msg的結構是poller_result但是實際結構是poller_node強轉過來的,再對比這兩個結構體,發現前三個成員是一致的,而第四個成員就是鏈接節點。
{
int state;
int error;
struct poller_data data;
};
struct __poller_node
{
int state;
int error;
struct poller_data data;
#pragma pack(1)
union
{
struct list_head list;
struct rb_node rb;
};
#pragma pack()
...
};
線程池(見thrdpool.h/.c)
線程池實現的功能往往是創建一系列工作線程,工作線程執行線程回調函數,從消息隊列中取任務并執行,當消息隊列中沒有任務時,等待任務出現。
workflow中的線程池就是這樣一個很標準的線程池,同時很靈活的讓邏輯脫離于線程池,線程回調函數并非實際要執行的邏輯,而是從消息隊列里get出的task,是一個包含了要執行的回調和上下文的task,線程回調函數執行了這個task。
{
void (*routine)(void *);
void *context;
};
這樣實現一個效果,就是可以運行時才動態決定要執行什么邏輯,即每個task可以是不同的任務,靈活度大大提升。
基礎數據結構主要就這四種,這里只分析了其設計中比較可圈可點的部分,而沒有仔細講一些簡單的基礎細節。
純計算任務和Reactor調度層
把阻塞的任務交給epoll去異步調用,計算任務交給線程池去異步執行,實現所有任務的異步調度,下面分別看看計算任務和reactor。
純計算任務
WorkFlow由框架統一管理原始任務線程池,單例__ExecManager內有一個單的封裝,優雅的實現對線程池的管理。
這一層有三個新概念:
ExecQueue是一個有鎖鏈表隊列;
ExecSession的execute()接口由派生出來的任務自己去定義需要執行的邏輯。
Executor類,創建并管理線程池,提供request()方法,request方法把對應任務放入到線程池去執行。request的參數有兩個,分別是當前session和所在的ExecQueue,如果queue里面只有這一個session,則把這個session放入Executor管理的線程池里里執行,如果不是首個任務,則只要放入隊列里就行了,線程routine會調度當前隊列中所有的任務進入線程池執行,并用ExecQueue中的鎖保持隊列中任務調度的同步性。
Executor::executor_thread_routine是線程執行routine,一共做了兩件事:
第一步會遞歸的調度所有當前Queue中的任務進線程池,并用ExecQueue中的鎖保持隊列中任務調度的同步性;
第二步是執行當前session,并由session自己保持數據同步。
Reactor:
這里主要涉及四個文件poller.h/.c mpoller.h/.c Communicator.h/.cc CommScheduler.h/.cc
其中poller是對epoll的封裝,mpoller又集成多個poller線程;Communicator顧名思義,就是通信器,封裝了mpoller和線程池;CommScheduler是對Communicator的封裝,全局唯一,最后創建在__CommManager中,通過WFGlobal暴露出來。
這一層主要完成了右圖所示的工作,poller線程把epoll事件做初加工處理,生成一個poller_result,設置需要handle的類型,然后把處理結果put()進消息隊列,給工作線程去處理。handler線程等待任務,當隊列里有任務時,根據任務的operation類型做相應處理。
poller
poller.h/.c提供了poller的創建、啟動、stop、poller_add、poller_del、poller_mod和add_timer的接口。
poller_create創建了poller數據結構,分配了poller_node的指針數組nodes,這里的nodes是一個以fd為下標的數組,這時候只有一個指針數組,node還沒有創建,node是在poller_add的時候創建的,創建node的時候會檢查監聽的操作是否需要result,需要的話同時分配result空間。但這時候poller線程還沒有跑起來,執行poller_start時將poller線程跑起來;poller_add、poller_del、poller_mod分別是epoll的增加節點、刪除節點、改變監聽事件 三種操作的簡單封裝;add_timer增加時間事件,
前面說過消息隊列里面裝的是poller_result(poller_node),poller_result里面都會有一個poller_data。
#define PD_OP_WRITE 2
#define PD_OP_LISTEN 3
#define PD_OP_CONNECT 4
#define PD_OP_SSL_READ PD_OP_READ
#define PD_OP_SSL_WRITE PD_OP_WRITE
#define PD_OP_SSL_ACCEPT 5
#define PD_OP_SSL_CONNECT 6
#define PD_OP_SSL_SHUTDOWN 7
#define PD_OP_EVENT 8
#define PD_OP_NOTIFY 9
#define PD_OP_TIMER 10
struct poller_data
{
short operation;
unsigned short iovcnt;
int fd;
union
{
SSL *ssl;
void *(*accept)(const struct sockaddr *, socklen_t, int, void *);
void *(*event)(void *);
void *(*notify)(void *, void *);
};
void *context;//CommService或CommConnEntry
union
{
poller_message_t *message;
struct iovec *write_iov;
void *result;
};
};
poller_data封裝了需要處理的fd、對應的操作(operation)、上下文(可能是CommService或CommConnEntry)。
poller的核心是poller_thread,poller_start的時候啟動了是一個poller_thread,poller_thread處理的是epoll_event,主流程是一個經典的雙循環,外層循環epoll_wait,每次最多處理256個fd,epoll返回后,再根據每個epoll_event事件的類型,循環處理每個類型的事件,從枚舉可以看到對當前node的操作有讀、寫、listen、connect、timer等等,不管是什么類型的epoll事件,poller_thread處理的結果會生成一個.poller_result,并把這個結果插入到消息隊列中。
具體的操作非常的多了,不適合靜態分析,后面再動態分析請求的全流程。
poller的操作都是線程安全的,mpoller啟動多個線程的時候也可以直接使用。
mpoller
可以看到實際上使用的并不是poller而是mpoller,mpoller是對多線程poller的封裝,一個mpoller包括至少一個poller,實際配幾個線程就創建幾個poller,并統一分配poller_node,所有poller共享poller_node數組。實際使用的時候可以根據運算核心數和業務邏輯的復雜程度調整poller_thread和handler_thread的配比。mpoller的add、del、mod接口會對傳入的fd對線程數求模,將fd均勻的分配到各個poller。
關于數據同步
可以看到對fd的[]操作并沒有加鎖,以mpoller_add為例
mpoller_t *mpoller)
{
unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
return poller_add(data, timeout, mpoller->poller[index]);
}
第4行計算index,fd和nthreads都是不會發生變化,不會修改的,線程之間無沖突,所以不需要加鎖。
第5行由poller_add來保證線程安全,每個poller中都有一個鎖,poller_add、poller_del、poller_mod的操作都是加鎖的,因為這三種操作都可能發生在不同的線程。
Communicator
Communicator是通訊器,是底層和業務層的樞紐,創建了mpoller和handler線程池,初始化時候啟動兩個線程池,bind的時候會把服務綁到communicator上,把服務創建的listen_fd放入到poller中開始監聽。handler_thread就是在Communicator中啟動的,handler_thread從消息隊列里拿到的是poller_result,handler_thread做的是拿到任務以后根據poller_result::poller_data::operation類型做相應處理。
相關的結構有:
鏈接:
class WFConnection : public CommConnection 創建的鏈接
對端:
CommTarget通訊目標,封裝了對端的地址、port、超時時間
消息:
{
int (*append)(const void *, size_t *, poller_message_t *);
char data[0]; // 柔性數組
};
class CommMessageIn : private __poller_message
{
private:
virtual int append(const void *buf, size_t *size) = 0;
struct CommConnEntry *entry;
};
class CommMessageOut
{
private:
virtual int encode(struct iovec vectors[], int max) = 0;
};
很明顯CommMessageIn是一次通信中的輸入消息,CommMessageOut是返回的消息的基類,輸入消息的基類是__poller_message,這里又使用了一個c程序員常用的小技巧,成員char data[0]是一個柔性數組,把__poller_message變成了一個變長結構體。
結構體中末尾成員是一個長度為0的char數組,這樣聲明看起來和char *data是一樣的,但是這樣寫相對于char指針有一些優勢。
對比如下結構,考慮__poller_message_test和__poller_message有什么區別
{
int (*append)(const void *, size_t *, poller_message_t *);
char *data; // char指針
};
首先,數組長度是0,說明沒分配空間。所以64位系統中,sizeof(struct __poller_message_test) == 16 而 sizeof(struct __poller_message) == 8。其次,如果使用一個char指針,需要為指針分配內存。而使用data[0]則不需要二次給指針分配內存,直接為結構分配適量大小內存即可,成員data會自動指向結構尾部的下一個字節。
輸入消息有一個append的虛方法,子類自己去定義如何反序列化,輸出消息有一個encode的虛方法,子類消息自己去定義序列化發送消息。基類__poller_message中的函數指針會被賦值為Communicator::append(const void *buf, size_t *size, poller_message_t *msg),實際運行時由函數指針append去調用各子類消息的virtual int append(const void *buf, size_t *size)對消息進行反序列化。
框架內已經定義好一些常用協議了:
會話:CommSession
CommSession封裝了一次會話所有組成單位,包括輸入/輸出消息、CommConnection、CommTarget
定義了消息的生產方式
服務器:CommService
類圖:
class WFServerBase : protected CommService 服務器的抽象。封裝了服務器地址、監聽套接字、活躍鏈接和連接數、服務器參數。
基類定義了newsession、newconnect接口。WFServerBase類中實現了服務啟動start()、停止stop()、創建/刪除鏈接newconnect()。
WFServer是一個模板類,模板參數是輸入輸出消息類型,可以實例化為各種類型的服務器,不同類型的服務器就是消息類型不同的服務器實例化,因為不同類型服務器實例消息類型不同,處理消息方式也不同,WFServer中保存了處理消息方式的回調——processer,并在服務創建的時候初始化。在WFServer中定義session創建方式new_session()的時候,會用processer來創建task,process實際上是task的處理方式。
服務Start()的時候會被bind()到全局的Communicator上,包括創建fd、bind、listen、放入epoll監聽,成為epoll監聽的第一個fd。服務實際上是交給Communicator創建的handler_thread線程池來驅動起來的。
Entry:CommConnEntry
打包了所有一次會話需要的上下文,包括poller、servide、session、target、socket等,處理accept事件(handle_listen_result)的時候由Communicator::accept_conn創建,創建后放在poller_data中,mpoller_add監聽
Communicator:
有了上面這些基礎結構,Communicator就是一個完全體了,Communicator初始化的時候,啟動了poller_thread、handler_thread驅動服務進行消息處理。
以示例代碼的hello_world程序為例,觀察一次網絡請求過程,看看poller_thread和handler_thread分別都做了什么。
從hello_world啟服到線程工作:
這里特別看一下poller_add的時候創建了poller_node實體,poller_node中有一個成員struct __poller_node *res,__poller_data_get_event()的時候會返回一個bool值,表示是否需要創建res。可以看到操作類型為listen的情況。是需要res的。
經過這個過程,服務器就啟動開始接受請求了,service創建listen_fd交由poller管理,當監聽到有客戶端鏈接時,accept+read。下面分析接收到一個請求時,poller_thread和handler_thread分別做了什么。
poller_thread知道listenfd可讀,則accept一個readfd,創建了對端target,把這個poller_result(poller_node)放進消息隊列。
handler_thread拿到這個poller_result之后,主要是創建了完整的CommConnEntry,并把負責read的poller_node放入epoll監聽,等待內核緩沖區有數據可讀。
這里有個細節,readfd是無阻塞模式,因為使用了epoll的邊緣觸發模式,即每個fd的狀態變化只通知一次,這樣的話需要把readfd上的數據全讀完,所以readfd必須設置成無阻塞模式,否則循環讀到最后肯定會被阻塞。
如果遇到errorno==EAGAIN則直接返回,因為對于fd阻塞調用eagain表示提示重試,對于非阻塞fd,errorno==EAGAIN則表示緩沖區已經寫滿,直接return本次處理結束。
readfd放入epoll之后,readfd上有數據到來后會被操作系統拷進內核緩沖區,然后epoll提示readfd可讀。poller_thread會進入處理可讀事件(handle_read)。
poller_thread對可讀事件的處理主要是把字節流讀出來,并反序列化,放入隊列提供給handler_thread,handler_thread調service處理業務邏輯。
handler對收到的消息的處理分兩種情況,如果是服務端,當做請求處理,如果是客戶端,當回復處理,所以hello_world程序進入請求處理流程。
服務器對請求的處理是創建服務對應類型的CommRequest,helloworld中實際是執行了一個WFHttpServerTask。
繼承關系:WFHttpServerTask——>WFServerTask——>WFNetworkTask——>CommRequest——>SubTask,CommSession。
SubTask和CommSession后面再仔細分析,這里先從字面理解,SubTask就是任務,就是處理自定義邏輯的過程,CommSession是會話。那handle的時候會先調用當前Task的processor.dispatch()執行任務,任務執行完自動subtask_done()的時候會調用scheduler->reply(),將結果返回 Send_message()。可以看到Send_message是先嘗試同步寫,如果同步寫失敗了,再嘗試異步寫,異步寫的過程就是先把文件描述符加入epoll監聽,等待可寫信號出現后,再寫入。寫的時候使用iovec,聚集寫盡量減少拷貝次數。
至此poller事件各種operation的處理,已經分析過PD_OP_READ、PD_OP_WRITE、PD_OP_LISTEN,再通過wget看一下PD_OP_CONNECT。
connect主要是處理客戶端鏈接服務端時,服務端無法立刻建立鏈接時的等待,異步等待屏蔽等待時間。
request的時候會優先檢查目標上有沒有idle鏈接,如果有的話直接復用,如果沒有會創建connect,conn_fd是非阻塞的,operation設置為PD_OP_CONNECT,放在epoll中管理,等待fd可用。
可以看到,是一個簡單的發送請求,等待結果的過程。
poller事件共有10種operation,這里分析過讀、寫、connect、listen四種流程,PD_OP_SSL_ACCEPT、PD_OP_SSL_CONNECT、PD_OP_SSL_SHUTDOWN三個只是使用openssl庫時的創建和關閉鏈接。還有另外兩種事件:PD_OP_EVENT、PD_OP_NOTIFY,這兩種分別是linux和mac環境下處理異步文件I/O用的。
異步文件I/O:
TODO
任務組織調度層
下面分析任務線程是如何執行任務的邏輯。這個層次有兩個核心基礎概念,一個是任務的抽象,一個是會話(session)的抽象,二者是所有執行邏輯的祖爺爺和祖奶奶。
任務:
前面看到對于請求的處理,實際是執行了CommRequest,CommRequest既是一個SubTask又是一個CommSession,最后是通過執行的是SubTask的接口dispatch()執行起來的,這里最重要的概念——子任務。workflow里面所有的邏輯,最后都是通過子任務執行起來的;子任務又可以通過各種組合關系,串并聯的組織起來。
這里有四個重要的基本元素:
1,SubTask——子任務,是一切任務的祖先。
2、ParallelTask——并行任務,并行任務里面管理SubTask數組,啟動時會把自己管理的SubTask一個一個全部dispatch一遍。
3、SeriesWork——串聯工作組,里面管理了一個數組的子任務,逐個執行。
4、ParallelWork——并聯工作組,里面管理了一個SeriesWork數組,其本身的祖先是一個SubTask,所以他可以被SeriesWork管理。
這樣就實現了任務的串并聯執行甚至以DAG的形式復合。
下面逐一分析:
class SubTask{
public:
virtual void dispatch() = 0;
private:
virtual SubTask *done() = 0;
protected:
void subtask_done();
private:
ParallelTask *parent;
SubTask **entry;
void *pointer;
};
SubTask是一切執行任務的祖先,不同的任務實現,實現不同的dispatch()和done()接口,提供兩個接口留給用戶自定義:
1、dispatch()接口 就是執行任務,用戶任務自定義執行邏輯,而在執行結束后,必須調用subtask_done()。
2、done()接口 在任務邏輯執行結束后,由subtask_done()調起done(),這個接口是用戶自定義的結束回調,在done()接口里面回收資源,銷毀任務。done()函數還會返回一個子任務的指針,當當前任務執行完還要執行下一個任務的時候,返回下一個任務,如果沒有下一個任務,則返回nullptr。為什么這么約定呢?這需要看一下subtask_done()函數的工作方式。
需要知道成員變量的意思才能明白調度方式:
pointer 一般指向當前所在SeriesWork,SubWork最后也是放在SeriesWork之中啟動起來的;
parent 當一個子任務被ParallelTask任務管理的時候,parent指向被管理的并行任務。
entry 指向待執行任務數組的首位。
subtask_done():仔細解讀一下subtask_done()的工作方式:
{
SubTask *cur = this;
ParallelTask *parent;
SubTask **entry;
while (1){
parent = cur->parent;
entry = cur->entry;
cur = cur->done();
if (cur){
cur->parent = parent;
cur->entry = entry;
if (parent)
*entry = cur;
cur->dispatch();
}
else if (parent) {
if (__sync_sub_and_fetch(&parent->nleft, 1) == 0) {
cur = parent;
continue;
}
}
break;
}
}
可以看到先保存了當前任務的parent和entry,然后直接調用了當前任務的done()接口。如果又返回了一個子任務,則調用新任務的dispatch(),使其運行起來,dispatch()到最后必然又會調用新任務的subtask_done();從而遞歸執行這條線上所有任務,直至done()不會再返回任務;當不再返回任務時,說明parent的孩子都執行完,就可以繼續再往上執行(parent也是一個SubTask),直至根任務執行完。
ParallelTask:
ParallelTask是SubTask的兒子,結構很簡單,管理了一個SubTask數組,ParallelTask::dispatch()的時候會把數組內管理的所有SubTask逐一dispatch()一遍,這樣的話就實現了同級任務的并列執行,特別注意并列執行不一定是并行,是否并行取決于調度。任務本身是順序dispatch()的,如果dispatch調度的時候把任務放入線程池執行任務就是并行的。
SeriesWork:
SeriesWork是一個有鎖的線程安全隊列,隊列中存儲了需要按順序執行的SubTask,預分配4個空間,如果入隊時隊列已滿,則像vector一樣拓展二倍空間。
SubTask都是放到SeriesWork中執行的。SeriesWork是怎么調度執行任務的?啟動函數Start(),會從第一個SubTask開始dispatch(),可以看到多數任務Task的done()的實現都是返回return series->pop();意思就是當前任務執行完了,返回當前所在的SeriesWork中的下一個任務,繼續執行,直至所有任務執行完。
注意SeriesWork本身不是一個SubTask,所以無法被SeriesWork管理。
ParallelWork:
ParallelWork稍微復雜一點
繼承關系:ParallelWork——>ParallelTask——>SubTask
可見:1、ParallelWork是一個SubTask,所以可以被SeriesWork管理;2、ParallelWork同時也是一個ParallelTask,管理了一個數組的SubTask;3、ParallelWork管理了一個SeriesWork數組,這個數組的長度和SubTask數組的長度相同。并且讓SubTask指向同索引SeriesWork的首個SubTask。
ParallelWork是怎樣啟動和調度任務的:
ParallelWork本身是一個SubTask,所以啟動時把他放入一個SeriesWork,作為SeriesWork的firsttask被調起dispatch();然后ParallelWork本身是一個ParallelTask,dispatch的時候會把其下管理的所有的SubTask逐個啟動dispatch();如圖,SubTask指向的實際是管理的SeriesWork的first Task,所以實際上相當于啟動了管理的所有SeriesWork。
這四個結構就是整個任務調度的基石,所有的邏輯都是作為任務執行起來的。并行任務管理串行任務,串行任務管理SubTask(并行任務也是SubTask),這套設定使任務可以自由復合DAG復合。
這時可以明白這個框架名字所謂WorkFlow,其核心就是組織任務的執行流,所有的執行邏輯都是任務。
會話(session):
想要執行的邏輯,通過成為SubTask可以啟動起來,并按一定的順序調度,那具體做的事,則被抽象為會話。
基礎session有四種:CommSession、ExecSession、IOSession、SleepSession,分別代表網絡操作、運算操作、I/O操作、睡眠操作,session都需要實現handle()接口,所有最后執行的任務都是這四種操作派生出來的。
SubTask這個大渣男分別和四種session結合生成了CommRequest、ExecRequest、SleepRequest、IORequest,使得所有的request都可以被作為子任務調度,都有state和error。
四種request分別派生出了WFNetWorkTask、WFThreadTask、WFTimerTask、WFFileTask。其中WFNetWorkTask和WFThreadTask都是兩個參數的模板類。對通信任務來說,參數是請求消息和回復消息,對于計算任務來說參數是輸入和輸出,WFReduceTask、WFSortTask、WFMergeTask是不用參數的的實例化,WFHttpTask、WFRedisTask、WFMysqlTask、WFKafkaTask只不過是不同協議的WFNetWorkTask的實例化。
CommRequest派生了WFNetworkTask;ExecRequest派生了WFThreadTask,二者都加入了輸入輸出模板參數,和一些控制參數,提供了方便的啟動多線程任務和網絡任務的方式。更有WFMultiThreadTask任務,批量管理多線程任務。
這里還有一個WFTimerTask,實現了不占線程的定時功能.。
WFTimerTask:
WFTimerTask可以讓任務休眠一定時長后執行,不占線程,達到時長之后返回執行回調,就是定時任務。
如果一個WFTimerTask被直接start(),則創建一個SeriesWork,并dispatch()起來,如果是串在其他的SeriesWork,當執行到這個task的時候直接dispatch()。
當SleepRequest被dispatch()時候,實際是調用當前scheduler(即communicator)的sleep(),實際是取出當前WFTimerTask的休眠時間,然后創建一個定時任務mpoller_add_timer交給epoll管理,等epoll提示時間到了,再切回來執行。
層次結構:
借用一張官圖非常清楚的表達清楚任務之間的層次關系。
用戶接口
至此,底層支持都分析過了,下面看看通過這些底層結構可以組織出什么花樣。
其他Tasks
WFCounterTask:
CounterTask是一個計數器Task,任務里保存了一個原子的unsigned用來計數,初始化時候傳入需要記的個數,每次任務被dispatch()的時候,計數器減一,直到計數器為0時,執行回調,配合一個阻塞信號量,可以實現一批并行任務的統一等待,如:WaitGroup。
可能是覺得手動創建CounterTask不夠優雅,框架還創建了CounterTask管理器,用一個紅黑樹以名字為key統一管理CounterTask,可以通過名字全局操作CounterTask。
WaitGroup
既然說到了就順便說一下WaitGroup。
WaitGroup實現了阻塞等待多個任務完成的效果。
WaitGroup由一個原子的等待個數,一個WFCounterTask和一個std::future組成。構造時創建一個std::promise,并綁定到future上;創建一個計數1的CounterTask并注冊回調,回調中時給promise->setvalue()。
每次調用done會給剩余個數減一,當減完時,counter->done(),這時回調會告訴futrue,所有任務都完成了,阻塞結束。
WFGraphNode和WFGraphTask:
WFGraphTask實現了將任務迅速的組織成有向無環圖的方法,一個WFGraphTask管理了一張由多個WFGraphNode組成。
WFGraphNode是一個WFCounterTask,并加入了一個WFGraphNode*列表:follower,follower表達了鄰接關系,保存的就是依賴當前任務的下游節點。因為是counter任務,所以具有計數的功能,記的數就是當前Node的入度。在當前任務執行完之后,會把所有下游節點都dispatch(計數)一次,當計數減少到0時,說明當前Node所有依賴已經完成了,就把當前graphNode上掛的SeriesWork執行起來。
依賴處理:當一個node1依賴Node2時候,Node2的下游節點列表里加入Node1,Node1的入度自增。
執行處理:當Node2執行完,Node1的入度減一。
框架的重載了GraphNode的自增運算符和大于號、小于號,自增運算符返回Node本身。大于號、小于號運算符調用依賴關系函數。從而很形象的可以通過如下語法表達節點之間的依賴關系:
a-->c;
b-->d;
c-->d;
是不是很秀?簡直妙不可言
再說一個細節:DAG建立起來了,但是Node上是怎么掛的任務呢?
答:創建WFGraphNode通過統一接口:WFGraphNode& WFGraphTask::create_graph_node(SubTask *task),創建的時候傳入你想要執行的任務,然后把要執行的任務和當前Counter任務串在一個Series里面。當當前Node計數器第一次變0的時候,會調到Done(),看一下關鍵的done()實現:
{
SeriesWork *series = series_of(this);
if (!this->user_data)//首次done會進這里
{
this->value = 1;//value=1使該任務再執行一次就可以達到結束狀態
this->user_data = (void *)1;//下次再進來就不進這個分支了,而是直接delete this;
}
else
delete this;
return series->pop();
}
首次done()的時候不析構,并將狀態置為下次進來析構(value賦1&&user_data非空)。
然后將本series里面要執行的用戶任務執行起來。當用戶任務執行完,會再次執行到GraphNode->Done();這時侯,Node析構,并將所有follower->dispatch()起來。這就是圖任務的整體執行路徑。
WFRepeaterTask:
這是一個遞歸Task,繼承自GenericTask,也就是說啟動時,會創建一個Series,并把Series啟動起來。創建的時候傳入創建任務的回調Create,在dispatch()得時候,往當前Series里傳入兩個任務,一個是Create回調創建出來的新任務,一個是當前任務。這樣的話,順序任務的調度就變成:執行任務—》創建任務—》執行任務。。。
WFConditional:
WFConditional是條件任務包裝器,可以把其他任務包裝成條件任務,通過一個atomic變量實現。新增加一個signal接口,當dispatch和signal都執行后,任務會被執行。原理:當任務被dispatch或者signal時,都會去設置原子bool的值,并檢查狀態,如果設置過狀態,就調起任務,可見第一次不會調起,第二次才會調起任務。
為了避免發送signal者持有條件任務的裸指針,框架還提供了全局的命名的條件任務,發送者可以根據名字給conditional發signal,內部是一個觀察者模式,以cond的名字為key構建了一個紅黑樹管理,當signal某個key的時候,找到對應的條件任務發送signal()。
WFModuleTask:
WFModuleTask提供了一個模塊級的封裝,可以把一系列任務封裝到一個模塊里,可以注冊一個模塊的回調函數。WFModuleTask本質上還是一個SeriesWork,把一系列任務封裝在一起,降低功能任務之間的耦合程度。
服務
基于workflow框架我們可以迅速的構建http服務器,只需要幾行代碼:
{
WFHttpServer server([](WFHttpTask *task) {
task->get_resp()->append_output_body("Hello World!");
});
if (server.start(8888) == 0) { // start server on port 8888
getchar(); // press "Enter" to end.
server.stop();
}
return 0;
}
可以看到構造一個WFHttpServer,只要傳入一個處理WFHttpTask的回調函數即可。
下面分別看 WFHttpServer 、WFServerTask
WFHttpServer
首先WFHttpServer是WFServer的http消息時的特化版本。WFServer在BaseServer的基礎上增加了輸入輸出模板參數,并增加了一個可以處理WFNetworkTask的回調函數,同時重寫了new_session方法;
poller在create_message的時候會調到new_session,創建WFServerTask;
Communicator并不知道Service是什么類型的service,在create_message的時候不管是什么類型的service,都調用service對應的new_session接口去生產session交給Poller去生成任務交由線程池執行。
WFServerTask
WFServerTask繼承自WFHttpTask,WFServerTask內定義了兩個局部類,Processor和Series。
前者Processor保存著服務初始化時傳入的回調和當前WFServerTask的指針,dispatch時執行回調處理當前任務。
后者Series本質上是一個SeriesWork,把Processor和當前任務串起來,并先執行Processor,最后執行當前WFServerTask,當前任務負責reply。同時負責引用計數,讓service知道有多少任務在引用。
服務小結
session是被動產生的,服務是靜態定義的,服務定義了自己的服務類型、和產生任務的方法、處理任務的回調等等,然后在服務啟動的時候綁定地址創建fd,把自己綁定到Communicator上,交給Reactor去調度。
-
服務器
+關注
關注
12文章
9218瀏覽量
85586 -
編程
+關注
關注
88文章
3623瀏覽量
93797 -
代碼
+關注
關注
30文章
4797瀏覽量
68710
發布評論請先 登錄
相關推薦
評論