c++20出來有一段時間了。其中一大功能就是終于支持協程了(c++作為行業大哥大級別的語言,居然到C++20才開始支持協程,我也是無力吐槽了,讓多少人等了多少年,等了多少青春)但千呼萬喚他終于還是來了,c++標準委員會的謹慎態度也造就了c++20的給出來協程:“性能之優秀”,“開發之靈活”和讓人勸退的“門檻之高”。
不過話說回來,c++從出身就注定了背負性能使命,他不是為簡單為應用層維度開發的語言(如果應用層你大可以用python java ruby lua等語言),他是一門可以開發其他語言的語言,所以追逐高性能和靈活性,舍棄矯情的低門檻,畢竟C++不是設計來給所有人用的語言。
之前用過python的協程,協程易用程度高,所以c++20到來也想嘗試c++狀態下的協程,但是接觸以后發現問題,c++20的協程狀態是:只有基礎設施,也就是實現了無棧協程的所有機制和功能,但沒有封裝到具體的應用層標準庫STL。此時大部分人就只能干瞪眼了,由于復雜的協程運作機制,沒有實現標準庫的情況下,說要用上協程你是在開玩笑,網上一致的意見c++20是半成品,要真的用上c++協程得等c++23協程標準庫完善后才行。
一貫本著不作死就不會死得態度,只會用庫不懂底層機制,那不是用c++的態度,所以深入學習c++20協程,半個月時間,寫了一個簡單的協程庫,在此過程中也對復雜的c++協程機制有了深入的了解。話說asio和cppcoro兩個庫已經支持了c++20協程,但是我覺得還是龐大和復雜,對于想通過看庫源代碼學習c++協程的同學,我覺得還是算了,在不懂協程機理的情況下,你連看源代碼都看不懂好吧!!有人會說有源代碼了你都看不懂,你是吹牛。那還真不是,c++協程在語法上會有些顛覆你的三觀,我們來舉個例子:
co_await std::suspend_always{};
co_yield a;
co_return 0;
}
int main(){
auto co = func(1);
co.hd.resume();
int a = co.hd.resume();
int b = co.hd.resume();
}
有人說func是一個協程函數,main中的func運行后會返回0,也就是 co是一個int變量值為0;
如果你按常規代碼理解,沒錯。但是在c++協程的世界,他完全不是上面說的情況。
正確的情況是: func在這里是一個協程生成器(這個概念很重要,他不是函數)返回值co是一個協程管理類A關聯了具體協程執行體后的協程實例的控制句柄(的包裝對象)。明確co不是協程實例(協程幀),是協程實例的控制句柄的包裝對象,在func(1)執行之后他只是“動態”生成了一個協程實例,并把控制句柄返回給用戶,但此時這個協程是掛起的,協程體{}代碼塊還沒有被執行過,所以不存在返回值。這非常的繞,讓人難以理解(后面還有更難理解的)。
在三次co.hd.resume();調用后協程才被完全執行完畢,此時a=1,b=0;
返回值保存在協程的實例(協程幀)中,通過協程管理類A的內部流程控制函數管理著返回值(A的promise_type定義了所有的協程控制行為)。
總結幾點 (重要,不要混淆):
1、“協程管理類A是包含協程行為控制的類定義 ,A不是協程,形如 A func(int a, …){ … } 才是一個完整的協程定義”;所以A func1(){}; A func2(){}; A func3(){}; 都可以與同一個協程控制A綁定,但他們是3個不同的協程定義,只是協程控制行為都為A。好處是,你可以用一個std::vector< A > 保存下這3個不同的協程,他們的主協程體(功能實現)各不相同。要讓A為一個協程管理類,必須包含struct promise_type{}定義,和一個控制句柄對象std::coroutine_handle< promise_type > hd; 特別的,A可以不實現await_xxx接口,他可以不是可等待體。
2、代碼塊體中有co_await ,co_yield,co_return關鍵字,則為協程體代碼塊,運行到關鍵字位置會**“觸發協程掛起” ** ,此時原調用者代碼阻塞在resume函數位置,運行權重新回到調用者,此時resume會返回,調用者繼續執行;
3、特別的:
co_await可以與可等待對象配合,形成更為復雜的協程掛起行為:一般異步IO操作,都是通過co_await + io可等待對象,完成異步操作后掛起協程,等待異步io完成后,再由**“調度器”**恢復協程繼續運行,從而發揮異步的意義,形成io復雜度向cpu復雜度的轉移。因此,協程解決的是問題是“異步”而不是“并行”,要實現并行只能考慮多線程或多進程,協程可以將單個線程cpu效率發揮到最大,而不會被io阻塞浪費掉當前線程的cpu算能,那問題來了,如果我們用 協程 + 多線程/多進程 結合模式呢,那恭喜你,世界都將是你的;
co_yield實現簡單掛起,簡單的立即放棄運行權,返回調用者,可恢復(異步應用場景相對較少,多用于循環生成器);
co_return實現最后一次簡單掛起,立即放棄運行權,返回調用者,協程后續不再可恢復(應用于協程退出);
4、可等待體(類形如 struct B{ await_ready();await_suspend();await_resume(); } 實現 三個await_xxx接口的類B是一個可等待體定義),他的實例是一個可等待對象;其中await_suspend()在執行后(不是執行前),會觸發當前協程掛起(記住,此處不是可等待對象掛起,是co_await 此可等待對象的當前協程掛起,不能混淆,由于概念不清,我在這個位置耽誤了很久的時間)
5、協程管理類A,和可等待體B,他們沒有直接關系,是兩個不同的東西。可等待體B控制掛起時點的行為是局部的,協程控制A控制協程整體創建,運行,掛起,銷毀,異常捕獲等過程的行為是整體的;協程只對應有一個控制類A,但是內部可以有多次掛起操作,每次操作對應一個可等待對象;
庫開發
本文重點是庫實戰開發,關于協程框架中的 3大概念:協程定義類及promise_type{},可等待體awaitable,協程控制句柄std::coroutine_handle< > ,此處不做介紹,自行了解。
但是要介紹一下協程調度的運行邏輯,以此加深庫開發過程的理解。這個過程在多線程下面是由內核管理的我們很少會了解,但是到了協程,你還要自己寫庫,那必須自己實現協程的調度算法和event loop模式
在此,我打個形象比喻:
現在一個家中有5個兒子,他們能力各不相同(工作者協程),還有一個媽媽(調度者協程),現在只有一臺電腦(單線程時間片),同一時刻,這臺電腦只能被老媽分給其中一個兒子來使用(協程搶占),其中一個兒子首先得到電腦開始工作(協程恢復),其他兒子只能等著無法工作(協程等待狀態),有電腦的兒子工作一會后此時他發送一封對外郵件(可等待對象)但要等待郵件回復后才能繼續工作(io等待完成),因為其他人此時還在等著用電腦而自己此時不具備繼續工作的條件,所以他識趣的放棄電腦的使用權,并把電腦交還給老媽(協程掛起等待,執行權交還caller)并等著老媽下次再把電腦給他使用,老媽拿到電腦后(調度協程恢復執行)檢查是否有回復郵件到來(調度協程檢查事件完成,對應事件循環iocp/epoll),如果有了,老媽檢查這封回復郵件是回復給哪個兒子的,并叫來對應的兒子(協程調度),把電腦交給他(協程恢復),得到電腦的兒子打開回復郵件拿到結果(await_resume() 返回異步io結果)繼續工作,…, 不斷循環。至此,完成一個協程完整調度流程。
要實現一個協程庫,他需要幾個東西:
1、實現具體的異步操作的可等待體(類似比喻中的發郵件操作,定義是否將電腦歸還,獲取回復后打開查詢結果等行為);
2、協程控制類A(他是一個協程任務task),A的promise_type中應該記錄協程的相關狀態,記錄掛起點的可等待對象的指針(很重要),可等待對象也可以充當task和調度協程,信息交換的媒介,可等待對象指針通過 await_suspend() 過程傳遞給task的promise做記錄并保存。調度協程通過可等待對象指針在異步操作完成時將異步操作結果傳回給等待的task。
3、 如總結和比喻所說,最重要的,還需要一個“協程調度器”。第一、他有一個主調度協程,調度協程具有一系列的調度算法,他的工作就是監測io異步完成事件的到來和分配執行權給task,第二,他維護有一個task協程隊列(可以多種方法實現),隊列記錄著所有的協程實例的句柄,這個隊列是為了協程調度準備的。
(注:之所以C++20無法直接使用的原因,其實就是,以上3個具體的工具沒有現成的庫,由于高度靈活,c++希望使用者自己實現以上組件,這讓用慣成品庫的我們非常難受,望而卻步,天天喊著等c++23的標準庫,但c++23也不能將所有的需求都囊括,遇到特殊需求還是要自己寫)
實現思路
調度器:
1 調度協程中的event loop本例是在Windows下采用的iocp模型(linux下可以使用epoll也很好改,原理一樣)
2、調度算法采用簡單的等權重調度,也就是掛入協程隊列的task,輪流調度,每個被調度的task被調度的機會相同;
3、完成事件標記和task恢復,業務分開,這樣目的是使得通過co_yield簡單掛起的任務有重新執行的機會(因為co_yeild不會在后續觸發完成事件)
4、調度器中記錄著協程隊列的開始task和末尾task的handle,以便調度協程;
可等待體:
1、文件file異步read,write操作;
2、網絡套接字,tcp協議下異步listen,accept, send, recv 操作;
3、網絡套接字,udp協議下異步sendto, recvfrom 操作;
4、協程休眠,實現sleepFor,sleepForEx操作,分別實現協程任務的毫秒和微秒級休眠;
5、在iocp模型下以上api都提供了重疊io操作,此時將api執行成功的重疊io操作,將對應的可等待體指針記錄到當前協程變量中(promise_type中的變量),一旦完成事件到來,調度協程就會設置可等待對象的完成標記狀態為true,調度協程只要在輪詢中逐個檢查task保存的可等待對象指針,檢查完成標記是否為true,為true恢復執行該協程,為false則跳過該協程,繼續輪詢 event loop;
任務定義(task協程):
1、task協程的promise_type中定義3個變量,
2、保存當前掛起的可等待提指針,如果當前協程不是io掛起或者是沒有掛起,該指針應該為null
3、保存當前協程自身所屬調度器Scheduler的指針;
4、保存此刻協程隊列中的前一個協程task的handle和后一個協程task的handle;
5、若當前task的可等待對象完成標記為true,則調度協程會將該task的before task和behind task鏈接,將該task的handle移動到協程隊列尾部,并且resume task,完成調度和恢復;
啟動協程調度:
1、實例化調度器 CoScheduler;
2、通過lambda表達方式定義task協程,并加入到調度器的協程隊列;
3、通過run方法啟動調度器調度運行各協程任務;
4、task協程中又可以動態嵌套生產新的task協程加入到調度隊列;
先看測試效果:(后面會有源碼)
案例1:tcp 服務器/客戶端模型測試
除調度協程外,協程隊列中會產生4個task,一個服務監聽器task,一個客戶端生成器task,服務端task,客戶端task
Main coro scheduler: Iocp loop started ... //0 調度協程執行
Iocp: New task arrived and first run, tid=26064
Tcp server coro started ... //1 監聽器task執行
Server listener accept wait ... --》 在accept異步掛起
Iocp: New task arrived and first run, tid=26064 //0 調度協程執行 (event loop段)
Clients creater coro started. //2 客戶端生成器task執行
Clients creater make client task 1. --》 動態生成客戶端task加入隊列
Clients creater yield run privilege to coro scheduler. --> 通過co_yield返回調度協程
Iocp: New task arrived and first run, tid=26064 //0 調度協程執行
Iocp: New task arrived and first run, tid=26064 --》 調度新到來的task
Client[1] connect wait ... //3 客戶端task執行 在connect異步掛起
Iocp: IoFileAwaitable[TConnect] completed 0 bytes, tid=26064 //0 調度協程 執行 檢測到connect完成事件
Clients creater fetch run privilege again. //2 客戶端生成器task 執行
Clients creater yield run privilege to coro scheduler.
Client[1] send wait ...
Iocp: IoFileAwaitable[TAccept] completed 47 bytes, tid=26064 //0 調度協程執行 檢測到accept完成事件
Server listener accept wait ... //1 服務端監聽task執行 在accept異步掛起
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調度協程 執行
Clients creater fetch run privilege again. //2 客戶端生成器task執行
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=26064 //0 調度協程執行
Server[1] send wait ... //4 服務端task執行
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調度協程執行 檢測到send完成事件
Client[1] recv wait ... //3 客戶端task執行
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調度協程執行 檢測到recv完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv wait ... //4 服務端task執行 在recv異步掛起
Client[1] recv server msg = //3 客戶端task執行
Hello client. this is server 1. 1st response. --》打印服務端發來的消息
Client[1] send wait ... --》在send異步掛起
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調度協程執行 檢測到recv完成事件
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 --》 檢測到send完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv client msg = //4 服務端task執行
Helle server, this is client 1: 2st request. -->打印客戶端發來的消息
Server[1] send wait ...
多個協程任務的異步交替執行,就是在一個協程遇到 可掛起的異步操作時,比如connect accept send recv等,把運行權限歸還給調度器,當完成事件到來,調度器又把執行權返回給task,形成執行權在調度器和task之間反復橫跳的情況,實現cpu的多任務復用;
案例2:udp 廣播模式測試
Main coro scheduler: Iocp loop started ...
Iocp: New task arrived and first run, tid=31188
Servers creater coro started.
Servers creater make server task 1.
Servers creater make server task 2.
Servers creater make server task 3.
Servers creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Clients creater coro started.
Clients creater make broadcastor client task 1.
Clients creater make broadcastor client task 2.
Clients creater make broadcastor client task 3.
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Iocp: New task arrived and first run, tid=31188
Udp server[1] coro started bind port = 33000...
Udp server[1] recvfrom wait ... //服務端1 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[2] coro started bind port = 33001...
Udp server[2] recvfrom wait ... //服務端2 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[3] coro started bind port = 33002...
Udp server[3] recvfrom wait ... //服務端3 異步接收
Iocp: New task arrived and first run, tid=31188
Broadcastor[1] send wait ... //客戶端1 異步發送
Iocp: New task arrived and first run, tid=31188
Broadcastor[2] send wait ... //客戶端2 異步發送
Iocp: New task arrived and first run, tid=31188
Broadcastor[3] send wait ... //客戶端3 異步發送
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188 //調度器 recvfrom事件完成
Servers creater fetch run privilege again.
Servers creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188 //調度器 sendto事件完成
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Udp server[2] recvfrom 1st broadcast 75 bytes data, msg = //服務端2 收到并打印消息
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Udp server[2] recvfrom wait ... --》 在recvfrom異步掛起
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Udp server[3] recvfrom 1st broadcast 75 bytes data, msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Udp server[3] recvfrom wait ...
Broadcastor[1] sendto server msg =
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Broadcastor[1] send wait ...
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Broadcastor[2] sendto server msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Broadcastor[2] send wait ...
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Broadcastor[3] sendto server msg =
Helle server, this is broadcastor 3: 1st randon broadcast to port=33001.
再看看性能測試:
同樣采用案例1和案例2的模型,但這次tcp采用100個server/client共200個task,udp采用20個braodcast/reciever共40個task來測試并發效果,做一下統計;效果如下
Clients creater coro started.
Clients creater make client task 1.
...
Clients creater make client task 100.
Summary coro count 203: total handle 92752 times (spend time 3.06413s), 30902.3 times/per-second.
Summary coro count 203: total handle 185852 times (spend time 6.06633s), 31010.6 times/per-second.
Summary coro count 203: total handle 278601 times (spend time 9.06766s), 30902.6 times/per-second.
Summary coro count 203: total handle 371901 times (spend time 12.0696s), 31080.1 times/per-second.
Summary coro count 203: total handle 466752 times (spend time 15.0719s), 31592 times/per-second.
按server和client一次完整的send和recv,也就是4此tcp通信,記錄為一次有效通訊記錄,記為1times,
則結果顯示,在coro=200時候,單個線程平均每秒將完成3萬次有效通訊(雖然是自導自演,但是協程的功能完整實現了,性能可觀)
Servers creater make server task 1.
...
Servers creater make server task 20.
Clients creater coro started.
Clients creater make broadcastor client task 1.
...
Clients creater make broadcastor client task 20.
Udp server[1] coro started bind port = 33000...
...
Udp server[20] coro started bind port = 33019...
Summary coro count 43: total handle 541730 times (spend time 3.02587s), 180571 times/per-second.
Summary coro count 43: total handle 1082377 times (spend time 6.02621s), 180196 times/per-second.
Summary coro count 43: total handle 1623102 times (spend time 9.02651s), 180223 times/per-second.
Summary coro count 43: total handle 2165716 times (spend time 12.0268s), 180853 times/per-second.
Summary coro count 43: total handle 2731919 times (spend time 15.0271s), 188714 times/per-second.
由于udp是單向非鏈接協議,速度會比tcp快得多,按一次sendto和recvfrom記為一次有效通訊,則在coro=40時候,單線程每秒有效通訊18萬次。
最后
c++協程理解之后并不是很難,并且只要api提供異步方案,都可以實現協程庫的封裝,比如mysql,redis等異步操作,后續都可以依葫蘆畫瓢,很快實現c++協程庫的開發。
本庫開發只是為記錄c++協程學習的經歷,很多功能后續還需完善。目前支持在windows下的各位file socket sleep的異步操作,后續可擴展支持linux的epoll模型。
代碼
頭文件CLCoroutine.h 其中的void test_coroutine_tcp_server()和void test_coroutine_udp_random_broadcast()就是案例1和案例2的測試代碼。
#define __CL_COROUTINE__
#if (defined(__cplusplus) && __cplusplus >= 202002L) || (defined(_HAS_CXX20) && _HAS_CXX20)
#ifndef CLUseCorotine
#define CLUseCorotine 1
#endif
#endif
#if (defined(CLUseCorotine) && CLUseCorotine)
#include
#include
#include
#include "../_cl_common/CLCommon.h"
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN //精簡windows包含庫的大小
#define WIN32_LEAN_AND_MEAN
#endif // !WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "Winsock2.h"
#include "WS2tcpip.h"
#include "MSWSock.h"
#pragma comment(lib, "ws2_32.lib")
#else
#endif
struct CoScheduler;
//(協程)任務單元
struct CoTask {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
CoTask get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() { }
template
std::suspend_always yield_value(const U& val) {
pAwaitableFile = nullptr;
return {};
}
CoScheduler* sc = 0;
handle before = 0, behind = 0;
void* pAwaitableFile = 0;
};
bool resume();
handle hd;
};
//(協程)任務調度器。包含主調度協程和事件循環,維護掛起的(協程)任務隊列
struct CoScheduler {
struct MainCoro {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
MainCoro get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() { }
CoScheduler* sc = 0;
};
constexpr bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<>) { }
auto await_resume() const { }
handle hd;
};
CoScheduler()
: m_curTid(std::this_thread::get_id())
, m_hIocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0))
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
bool registe(HANDLE hFile) {
if (!hFile || hFile == INVALID_HANDLE_VALUE || !m_hIocp || ::CreateIoCompletionPort(hFile, m_hIocp, 0, 0) != m_hIocp)
return false;
else
return true;
}
bool registe(SOCKET sock) {
return registe((HANDLE)sock);
}
// 創建task并等待后續調度執行
template
void gather(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
::PostQueuedCompletionStatus(m_hIocp, 0, (ULONG_PTR)coro.hd.address(), 0);
}
// 創建task并立即調度執行
template
void createTask(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
coro.resume();
}
size_t taskCount() const { return m_taskCount; }
// 執行協程調度
void run();
private:
void appendNewTaskToEnd(CoTask& cur) {
auto& cprm = cur.hd.promise();
cprm.sc = this;
if (m_end.hd) {
cprm.before = m_end.hd;
cprm.behind = 0;
m_end.hd.promise().behind = cur.hd;
}
m_end.hd = cur.hd;
++m_taskCount;
if (m_begin.hd == 0) {
m_begin.hd = cur.hd;
cprm.before = 0;
}
}
void moveTaskToEnd(CoTask::handle h) {
if (removeDoneTask())
return;
if (!h)
return;
auto& cprm = h.promise();
if (h == m_begin.hd) {
m_begin.hd = cprm.behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
else if (h == m_end.hd) {
}
else {
cprm.behind.promise().before = cprm.before;
cprm.before.promise().behind = cprm.behind;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
}
bool removeDoneTask() {
bool ret = false;
while (m_begin.hd && m_begin.hd.done()) {
auto h = m_begin.hd;
m_begin.hd = h.promise().behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
h.destroy();
--m_taskCount;
ret = true;
}
return ret;
}
HANDLE m_hIocp;
const std::thread::id m_curTid;
MainCoro m_main;
CoTask m_begin, m_end;
std::atomic m_taskCount = 0;
};
// IO文件操作類型
enum IoFileType :int {
TUnknown = 0,
TRead,
TWrite,
TListen,
TAccept,
TConnect,
TSend,
TRecv,
TSendto,
TRecvfrom,
TSleep,
};
// IO文件調度優先級
enum IoFilePriority : int {
WaitingForPolling = 0, // 等待順序輪詢調度
DispatchImmediately, // 立即調度
};
// 支持異步掛起的可等待文件對象(基類)
template
struct IoFileAwaitable : OVERLAPPED {
operator HANDLE() const { return m_hFile; }
operator SOCKET() const { return (SOCKET)m_hFile; }
bool isRegisted() const { return m_isRegisted; }
bool isCompleted() const { return m_isCompleted; }
void setCompleted() { m_isCompleted = true; }
void resetCompleted() {
memset(this, 0, sizeof(OVERLAPPED));
m_isCompleted = 0;
}
void setReturn(Ret ret) { m_ret = ret; }
Ret getReturn() const { return m_ret; }
IoFileType& type() { return m_fileType; }
const char* typeName() const {
#define _TypeNameItem( tp ) case tp: return #tp;
switch (m_fileType)
{
_TypeNameItem(TUnknown);
_TypeNameItem(TRead);
_TypeNameItem(TWrite);
_TypeNameItem(TListen);
_TypeNameItem(TAccept);
_TypeNameItem(TConnect);
_TypeNameItem(TSend);
_TypeNameItem(TRecv);
_TypeNameItem(TSendto);
_TypeNameItem(TRecvfrom);
_TypeNameItem(TSleep);
default:
return "TUnknown";
}
}
void* getTransferredBytesCountBuffer() const {
return m_transferredBytesCount;
}
void setTransferredBytesCountRecvBuffer(void* countBuf) {
m_transferredBytesCount = countBuf;
}
bool close() {
if (m_hFile) {
return CloseHandle(detach());
}
return true;
}
HANDLE detach() {
HANDLE ret = *this;
m_hFile = 0;
m_isRegisted = 0;
return ret;
}
HANDLE attach(CoScheduler& sc, HANDLE s) {
HANDLE ret = *this;
m_hFile = s;
m_isRegisted = sc.registe(m_hFile);
return ret;
}
int getLastError() const { return m_lastError; }
void setLastError(int err) { m_lastError = err; }
CoTask::handle& onwer() { return m_owner; }
auto getPriority() const { return m_priority; }
void setPriority(IoFilePriority priority) { m_priority = priority; }
// awaitable methed
bool await_ready() const { return isCompleted(); }
void await_suspend(CoTask::handle h) {
h.promise().pAwaitableFile = this;
m_owner = h;
}
Ret await_resume() {
setTransferredBytesCountRecvBuffer(nullptr);
return getReturn();
}
protected:
IoFileAwaitable()
: m_hFile((HANDLE)0)
, m_isRegisted(false)
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, HANDLE hFile)
: m_hFile(hFile)
, m_isRegisted(sc.registe(m_hFile))
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, SOCKET sock)
: m_hFile((HANDLE)sock)
, m_isRegisted(sc.registe(sock))
{
resetCompleted();
}
HANDLE m_hFile;
bool m_isRegisted;
bool m_isCompleted;
IoFileType m_fileType = IoFileType::TUnknown;
void* m_transferredBytesCount = nullptr;
int m_lastError = ERROR_SUCCESS;
IoFilePriority m_priority = IoFilePriority::WaitingForPolling;
CoTask::handle m_owner;
Ret m_ret = 0;
};
// 支持異步掛起的套接字(基類)
template
struct AsyncSocket :public IoFileAwaitable {
using base = IoFileAwaitable;
~AsyncSocket() { close(); }
sockaddr_in localAddress() const { return m_local; }
sockaddr_in remoteAddress() const { return m_remote; }
sockaddr_in* localAddress() { return &m_local; }
sockaddr_in* remoteAddress() { return &m_remote; }
int close() {
int ret = 0;
if (base::m_hFile) {
if (base::m_hFile != (HANDLE)INVALID_SOCKET) {
ret = closesocket(detach());
}
else {
base::m_hFile = 0;
base::m_isRegisted = 0;
}
}
return ret;
}
SOCKET detach() {
return (SOCKET)base::detach();
}
SOCKET attach(CoScheduler& sc, SOCKET s) {
return (SOCKET)base::attach(sc, (HANDLE)s);
}
protected:
AsyncSocket(CoScheduler& sc, SOCKET sock)
:base(sc, sock)
{ }
sockaddr_in m_local = { 0 };
sockaddr_in m_remote = { 0 };
};
struct AsyncAcceptor;
// 支持異步掛起的服務端監聽器,是一個等待連接到來的TCP監聽套接字
struct AsyncListener :public AsyncSocket {
AsyncListener(CoScheduler& sc, unsigned long addr, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = addr;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
AsyncListener(CoScheduler& sc, const char* ip, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_port = htons(port);
InetPton(AF_INET, ip, &m_local.sin_addr);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
sockaddr_in listenAddress() const { return localAddress(); }
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
};
// 支持異步掛起的TCP連接(基類)
struct AsyncTcp :public AsyncSocket {
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend);
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv);
protected:
AsyncTcp(CoScheduler& sc)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{ }
};
// 支持異步掛起的服務端接收器,是一個接受端TCP套接字
struct AsyncAcceptor : public AsyncTcp {
AsyncAcceptor(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TAccept;
}
// 解析到來連接的地址信息,保存在內部地址變量
void perseAddress(void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer) {
if (lpAcceptBuffer == 0 || nNumberOfBytesAcceptBuffer == 0)
throw std::logic_error("perseAddress parm is invalid.");
static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptSockAddrs = 0;
if (!lpfnGetAcceptSockAddrs) {
GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
*this,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptexSockAddrs,
sizeof(GuidGetAcceptexSockAddrs),
&lpfnGetAcceptSockAddrs,
sizeof(lpfnGetAcceptSockAddrs),
&dwBytes, NULL, NULL))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener GetAcceptexSockAddrs error.");
}
}
int localLen = 0, remoteLen = 0;
lpfnGetAcceptSockAddrs(
lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(LPSOCKADDR*)localAddress(),
&localLen,
(LPSOCKADDR*)remoteAddress(),
&remoteLen
);
}
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncListener& sockListener, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
int await_resume() {
setPriority(IoFilePriority::WaitingForPolling);
return AsyncTcp::await_resume();
}
};
// 支持異步掛起的用戶端連接器,是一個發起端TCP套接字
struct AsyncConnector : public AsyncTcp {
AsyncConnector(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
}
AsyncConnector(CoScheduler& sc, const char* ip, unsigned short port)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
setConnectRemoteAddress(ip, port);
bindConnectLocalPort(0);
}
void setConnectRemoteAddress(const char* ip, unsigned short port) {
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_port = htons(port);
InetPton(AF_INET, ip, &m_remote.sin_addr);
}
int bindConnectLocalPort(unsigned short port = 0) {
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = INADDR_ANY;
m_local.sin_port = htons(port);
return ::bind(*this, (const sockaddr*)&m_local, sizeof(m_local));
}
// 返回值true成功,false失敗
AsyncConnector& connect(const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
};
// 作為服務端Acceptor應該具有事件完成并立即調度優先級,保證吞吐量
// 返回值true成功,false失敗
inline
AsyncAcceptor&
accept(AsyncListener& sockListener, AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted) {
static LPFN_ACCEPTEX lpfnAcceptEx = 0;
sockListener.type() = IoFileType::TListen;
sockAccept.type() = IoFileType::TAccept;
sockAccept.resetCompleted();
sockAccept.setTransferredBytesCountRecvBuffer(lpNumberOfBytesAccepted);
sockAccept.setPriority(IoFilePriority::DispatchImmediately);//設置為立即調度優先級
if (lpNumberOfBytesAccepted)
*lpNumberOfBytesAccepted = 0;
if (!lpfnAcceptEx) {
GUID GuidAcceptEx = WSAID_ACCEPTEX; // GUID,這個是識別AcceptEx函數必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockListener,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpfnAcceptEx,
sizeof(lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
lpfnAcceptEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Accept get AcceptEx function address error.");
}
}
bool ret = lpfnAcceptEx(
sockListener,
sockAccept,
(char*)lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
lpNumberOfBytesAccepted,
&sockAccept
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockAccept.setReturn(ret);
return sockAccept;
}
sockAccept.setReturn(false);
sockAccept.setCompleted();
sockAccept.setPriority(IoFilePriority::WaitingForPolling);
return sockAccept;
}
// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
static LPFN_CONNECTEX lpfnConnectEx = 0;
sockCon.type() = IoFileType::TConnect;
sockCon.resetCompleted();
if (lpdwBytesSent)
*lpdwBytesSent = 0;
if (!lpfnConnectEx) {
GUID GuidConnectEx = WSAID_CONNECTEX; // GUID,這個是識別AcceptEx函數必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockCon,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&lpfnConnectEx,
sizeof(lpfnConnectEx),
&dwBytes, NULL, NULL))
{
lpfnConnectEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Connect get ConnectEx function address error.");
}
}
sockCon.setTransferredBytesCountRecvBuffer(lpdwBytesSent);
bool ret = lpfnConnectEx(
sockCon,
name,
namelen,
lpSendBuffer,
dwSendDataLength,
lpdwBytesSent,
&sockCon
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockCon.setReturn(ret);
return sockCon;
}
sockCon.setReturn(false);
sockCon.setCompleted();
return sockCon;
}
// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
sockCon.setConnectRemoteAddress(ip, port);
sockCon.bindConnectLocalPort(0);
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
send(AsyncTcp& sock, const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend) {
sock.type() = IoFileType::TSend;
sock.resetCompleted();
if (lpNumberOfBytesSend)
*lpNumberOfBytesSend = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSend);
WSABUF wsaBuf{ nNumberOfBytesSendBuffer , (char*)lpSendBuffer };
auto ret = WSASend(sock, &wsaBuf, 1, lpNumberOfBytesSend, 0, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
recv(AsyncTcp& sock, void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv) {
sock.type() = IoFileType::TRecv;
sock.resetCompleted();
if (lpNumberOfBytesRecv)
*lpNumberOfBytesRecv = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecv);
WSABUF wsaBuf{ nNumberOfBytesRecvBuffer , (char*)lpRecvBuffer };
unsigned long dwFlag = 0;
auto ret = WSARecv(sock, &wsaBuf, 1, NULL, &dwFlag, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 支持異步掛起的UDP(非連接)套接字
struct AsyncUdp : public AsyncSocket {
// 設置失敗返回-1;返回1設置為廣播模式(client端),返回0則為接收端(server端)
int status() const { return m_isBroadCast; }
int* remoteLen() { return &m_remoteLen; }
protected:
//isBroadCast = true則為發送端udp(client端),使用sendTo,此時可以在sendTo階段動態指定廣播目的地址
//isBroadCast = false則為接受端udp(server端),使用recvFrom,構造時必須指定綁定的廣播接收地址
AsyncUdp(CoScheduler& sc, bool isBroadCast = true, const char* ip = 0, unsigned short port = 0)
: AsyncSocket(sc, WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
setBroadCast(isBroadCast, ip, port);
}
// 設置失敗返回-1;返回1設置為廣播模式(client端),返回0則為接收端(server端)
int setBroadCast(bool isBroadCast, const char* ip, unsigned short port) {
if (*this && *this != INVALID_SOCKET)
{
m_isBroadCast = isBroadCast;
if (::setsockopt(*this, SOL_SOCKET, SO_BROADCAST, (char*)&m_isBroadCast, sizeof(m_isBroadCast)) == 0) {
if (isBroadCast) {
setBindAddress(0, 0);
setBroadcastAddress(ip, port);
}
else {
setBindAddress(ip, port);
}
return m_isBroadCast;
}
}
return m_isBroadCast = -1;
}
// 設置接收器綁定的收聽本地地址
bool setBindAddress(const char* ip, unsigned short port)
{
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp set reuse address error.");
}
if (::bind(*this, (const sockaddr*)&m_local, sizeof(sockaddr_in)))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp bind address error.");
}
return true;
}
// 設置發送要廣播到的目標地址(遠端地址)
void setBroadcastAddress(const char* ip, unsigned short port)
{
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_remote.sin_port = htons(port);
}
int m_remoteLen = 0;
int m_isBroadCast = -1;
};
// 支持異步掛起的UDP協議廣播器套接字(發送端,client端)
struct AsyncBroadcastor :public AsyncUdp {
AsyncBroadcastor(CoScheduler& sc, const char* ip = 0, unsigned short port = 0)
:AsyncUdp(sc, true, ip, port)
{
type() = IoFileType::TSendto;
}
// 發送端udp(client端)向內部已保存的指定的廣播地址發送數據(未設置廣播地址將失敗)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
// 發送端udp(client端)向動態指定的廣播地址發送數據
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
bool isValidBroadcastor() const { return status() == 1; }
using AsyncUdp::setBroadcastAddress;
};
// 支持異步掛起的UDP協議接收器套接字(接收端,server端)
struct AsyncReceiver :public AsyncUdp {
AsyncReceiver(CoScheduler& sc, const char* ip, unsigned short port)
:AsyncUdp(sc, false, ip, port)
{
type() = IoFileType::TRecvfrom;
}
// 接收端udp(server端)向綁定的本地地址獲取廣播數據
// 返回值0成功,SOCKET_ERROR失敗
AsyncReceiver& recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd);
bool isValidReceiver() const { return status() == 0; }
using AsyncUdp::setBindAddress;
};
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.type() = IoFileType::TSendto;
sock.resetCompleted();
if (lpNumberOfBytesSent)
*lpNumberOfBytesSent = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSent);
WSABUF wsaBuf{ nNumberOfBytesSentBuffer , (char*)lpSentBuffer };
auto ret = WSASendTo(sock, &wsaBuf, 1, lpNumberOfBytesSent, 0,
(const sockaddr*)sock.remoteAddress(), (int)sizeof(sockaddr_in), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const char* ip, unsigned short port,
const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.setBroadcastAddress(ip, port);
return ::sendTo(sock, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);;
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncReceiver&
recvFrom(AsyncReceiver& sock, void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd) {
sock.type() = IoFileType::TRecvfrom;
sock.resetCompleted();
if (lpNumberOfBytesRecvd)
*lpNumberOfBytesRecvd = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecvd);
WSABUF wsaBuf{ nNumberOfBytesRecvdBuffer , (char*)lpRecvdBuffer };
DWORD dwFlag = 0;
*sock.remoteLen() = sizeof(sockaddr_in);
auto ret = WSARecvFrom(sock, &wsaBuf, 1, NULL, &dwFlag,
(sockaddr*)sock.remoteAddress(), sock.remoteLen(), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
struct AsyncFile : public IoFileAwaitable {
AsyncFile(CoScheduler& sc, const char* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileA(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
AsyncFile(CoScheduler& sc, const wchar_t* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileW(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
~AsyncFile() { close(); }
AsyncFile& read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead);
AsyncFile& write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten);
};
// 返回值true成功,false失敗
inline
AsyncFile&
read(AsyncFile& file, void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesRead)
*lpNumberOfBytesRead = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRead);
auto ret = ReadFile(file, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}
// 返回值true成功,false失敗
inline
AsyncFile&
write(AsyncFile& file, const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesWritten)
*lpNumberOfBytesWritten = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesWritten);
auto ret = WriteFile(file, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}
struct AsyncSleepor :public IoFileAwaitable {
AsyncSleepor(long long microOrMilliSeconds = 0, bool useMicroSeconds = false)
: microOrMilliSeconds(microOrMilliSeconds)
, useMicroSeconds(useMicroSeconds)
{
type() = IoFileType::TSleep;
start();
}
void start()
{
tp = std::chrono::steady_clock::now();
}
auto getSpendMicroSeconds() const {
constexpr auto div = std::nano::den / std::micro::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
auto getSpendMilliSeconds() const {
constexpr auto div = std::nano::den / std::milli::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
bool isCompleted() {
setReturn(useMicroSeconds ? getSpendMicroSeconds() : getSpendMilliSeconds());
return (m_isCompleted = getReturn() >= microOrMilliSeconds);
}
protected:
long long microOrMilliSeconds;
bool useMicroSeconds;
std::chrono::steady_clock::time_point tp;
};
//毫秒妙級別休眠,返回實際休眠的毫妙數
inline
AsyncSleepor
sleepFor(long long milliSeconds) {
return AsyncSleepor{ milliSeconds };
}
//微妙級別休眠,返回實際休眠的微妙數
inline
AsyncSleepor
sleepForEx(long long microSeconds) {
return AsyncSleepor{ microSeconds, true };
}
void test_coroutine_tcp_server(unsigned short serverPort = 33100, int totalClientCount = 100, bool dumpTestInfo = 0);
void test_coroutine_udp_random_broadcast(unsigned short broadCastPort = 33000, int totalClientBroadcastCount = 20, bool dumpTestInfo = 0);
#endif
#endif
實現文件
CLCoroutine.cpp
#if (defined(CLUseCorotine) && CLUseCorotine)
#include "../_cl_common/CLCommon.h"
#include "../_cl_string/CLString.h"
#include "../_cl_logger/CLLogger.h"
void CoScheduler::run() {
auto coro = [this]() ->MainCoro {
//logger.debug("nMain coro scheduler started ...");
#ifdef _WIN32
if (m_hIocp) {
CLString err;
DWORD dwMilliseconds = 0;
//logger.debug("nMain coro scheduler: Iocp loop started ...");
while (1) {
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* pOverlapped = 0;
while (GetQueuedCompletionStatus(m_hIocp, &numberOfBytesTransferred, &completionKey, &pOverlapped, dwMilliseconds))
{
if (pOverlapped) { //io完成事件
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(ERROR_SUCCESS);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = numberOfBytesTransferred;
// 根據可等待對象的優先級,決定是否立即調度或是輪流調度讓每個任務的權重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調度
break;
}
m_end.resume();
}
else { //新task來到,立即調度
if (numberOfBytesTransferred == 0 && completionKey) {
auto h = CoTask::handle::from_address((void*)completionKey);
moveTaskToEnd(h);
h.resume();
}
else {
auto lr = GetLastError();
logger.warning("Iocp: get status in event loop: ",err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
}
}
auto lr = GetLastError();
if (lr == WSA_WAIT_TIMEOUT) {
moveTaskToEnd(m_begin.hd); //輪詢調度
m_end.resume(); //執行resume,此刻所有等待io均未完成不會執行,但yeild讓渡的協程得到執行;
}
else if(pOverlapped) {
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(lr);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = 0;
IoFileType fileType = pFile->type();
switch (fileType)
{
case TUnknown:
break;
case TRead:
case TWrite:
case TListen:
case TAccept:
case TConnect:
pFile->setReturn(false);
break;
case TSend:
case TRecv:
case TSendto:
case TRecvfrom:
pFile->setReturn(SOCKET_ERROR);
break;
case TSleep:
break;
default:
break;
}
switch (lr)
{
case ERROR_NETNAME_DELETED: //64 指定的網絡名不再可用
break;
case ERROR_SEM_TIMEOUT://121 信號燈超時
break;
default:
logger.error("Iocp: get status out event loop: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
break;
}
// 根據可等待對象的優先級,決定是否立即調度或是輪流調度讓每個任務的權重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調度
break;
}
m_end.resume();
}
else {
logger.error("Iocp: get status out event loop no completed: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
if (taskCount() == 0)
break;
}
CloseHandle(m_hIocp);
m_hIocp = 0;
//logger.debug("nMain coro scheduler: Iocp loop has done ...");
}
#endif
//logger.debug("nMain coro scheduler quit ...");
co_return;
};
m_main = coro();
m_main.hd.promise().sc = this;
m_main.hd.resume();
m_main.hd.destroy();
}
bool CoTask::resume() {
if (!hd)
return true;
else if (hd.done()) {
return false;
}
else {
auto pFile = (IoFileAwaitable<>*) hd.promise().pAwaitableFile;
if (!pFile) //第一次調度或者yield的協程
hd.resume();
else {
if (pFile->type() == IoFileType::TSleep) { //休眠調度
if (((AsyncSleepor*)pFile)->isCompleted()) {
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
else if (pFile->isCompleted()) { //io完成調度
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
return true;
}
}
#ifdef _WIN32
#else // Windows
#endif // Linux
AsyncAcceptor& AsyncListener::accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(* this, sockAccept, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}
AsyncAcceptor& AsyncAcceptor::accept(AsyncListener& sListen, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(sListen, *this, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}
AsyncConnector& AsyncConnector::connect(const sockaddr* name, int namelen, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncConnector& AsyncConnector::connect(const char* ip, unsigned short port, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, ip, port, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncConnector& AsyncConnector::connect(void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncTcp& AsyncTcp::send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend)
{
return ::send(*this, lpSendBuffer, nNumberOfBytesSendBuffer, lpNumberOfBytesSend);
}
AsyncTcp& AsyncTcp::recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv)
{
return ::recv(*this, lpRecvBuffer, nNumberOfBytesRecvBuffer, lpNumberOfBytesRecv);
}
AsyncBroadcastor& AsyncBroadcastor::sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}
AsyncBroadcastor& AsyncBroadcastor::sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, ip, port, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}
AsyncReceiver& AsyncReceiver::recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd)
{
return ::recvFrom(*this, lpRecvdBuffer, nNumberOfBytesRecvdBuffer, lpNumberOfBytesRecvd);
}
AsyncFile& AsyncFile::read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead)
{
return ::read(*this, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead);
}
AsyncFile& AsyncFile::write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten)
{
return ::write(*this, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten);
}
#include
void test_coroutine_tcp_server(unsigned short serverPort, int totalClientCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
CoScheduler sc;
int servRun = 0;
int totals = 0;
CLTick tk;
// 服務端監聽器task
sc.gather([&]()->CoTask {
logger.info("nTcp server coro started ...");
AsyncListener listener(sc, ADDR_ANY, serverPort);
// loop accept
std::vector acceptbuf(260);
AsyncAcceptor* pAcceptor = 0;
int servId = 0;
while (true)
{
AsyncAcceptor& acceptor = pAcceptor ? *pAcceptor : *(pAcceptor = new AsyncAcceptor(sc));
DWORD nValidAccept;
logger.debug("nServer listener accept wait ...");
bool ret = co_await listener.accept(acceptor, acceptbuf.data(), acceptbuf.size(), &nValidAccept);
if (ret) {
//create server task
acceptor.perseAddress(acceptbuf.data(), acceptbuf.size());
servRun++;
// 服務端task
sc.gather([&](AsyncAcceptor* pAcceptor, int idx) ->CoTask {
AsyncAcceptor& acp = *pAcceptor;
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::sprintf(bufSend.data(), "nHello client. this is server %d. %dst response.", idx, total);
logger.debug("nServer[%d] send wait ...", idx);
int ret = co_await acp.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
logger.debug("nServer[%d] recv wait ...", idx);
ret = co_await acp.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (nbytesRecv == 0)
break;
logger.debug("nServer[%d] recv client msg = %s", idx, bufRecv.data());
total++;
totals++;
}
logger.debug("nServer[%d] recv client close msg", idx);
delete pAcceptor;
servRun--;
}, pAcceptor, ++servId);
pAcceptor = 0;
}
}
logger.info("nTcp server coro quit.%d", GetCurrentThreadId());
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientCount) {
i++;
logger.info("nClients creater make client task %d.", i);
nClient++;
// 客戶端task
sc.gather([&](int idx)->CoTask {
AsyncConnector con(sc);
logger.debug("nClient[%d] connect wait ...", idx);
auto ret = co_await con.connect("127.0.0.1", serverPort);
if (!ret) {
logger.debug("nClinet[%d] connect server fail, %s", idx, CLString().getLastErrorString(GetLastError()));
co_return;
}
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::snprintf(bufSend.data(), bufSend.size(), "nHelle server, this is client %d: %dst request.", idx, total);
logger.debug("nClient[%d] send wait ...", idx);
auto ret = co_await con.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
if (!(ret == SOCKET_ERROR || nbytesSend == 0)) {
logger.debug("nClient[%d] recv wait ...", idx);
ret = co_await con.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0)
break;
logger.debug("nClient[%d] recv server msg = %s", idx, bufRecv.data());
}
total++;
}
logger.debug("nClient[%d] get server close msg and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統計協程
sc.gather([&]()->CoTask {
auto last = totals;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000);
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totals, time, (totals - last) / (time - lastTime));
last = totals;
lastTime = time;
}
});
sc.run();
}
void test_coroutine_udp_random_broadcast(unsigned short broadCastPort, int totalClientBroadcastCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
srand(time(0));
CoScheduler sc;
int servRun = 0;
int totalsRecvd = 0;
int totalsSendto = 0;
CLTick tk;
std::vector portList(totalClientBroadcastCount);
for (int i = 0; i < totalClientBroadcastCount; i++)portList[i] = broadCastPort + i;
// 服務端生成器
sc.gather([&]()->CoTask {
logger.info("nServers creater coro started.");
int nServer = 0;
for (int i = 0; 1; )
{
if (nServer < totalClientBroadcastCount) {
i++;
logger.info("nServers creater make server task %d.", i);
nServer++;
// 服務端task (廣播接收端)
sc.gather([&](int i)->CoTask {
logger.info("nUdp server[%d] coro started bind port = %d...", i, portList[i - 1]);
AsyncReceiver serv(sc, "127.0.0.1", portList[i - 1]);
// recv
std::vector recv(260);
int servId = 0;
int total = 1;
while (true)
{
DWORD nbytesRecv;
logger.debug("nUdp server[%d] recvfrom wait ...", i);
int ret = co_await serv.recvFrom(recv.data(), recv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0) {
CLString().getLastErrorMessageBoxExceptSucceed(WSAGetLastError());
break;
}
logger.debug("nUdp server[%d] recvfrom %dst broadcast %u bytes data, msg = %s", i, total, nbytesRecv, recv.data());
total++;
totalsRecvd++;
}
logger.info("nUdp server[%d] coro quit.%d", i, GetCurrentThreadId());
nServer--;
}, i);
}
else {
logger.debug("nServers creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nServers creater fetch run privilege again.");
}
}
logger.debug("nServers creater coro quit.");
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientBroadcastCount) {
i++;
logger.info("nClients creater make broadcastor client task %d.", i);
nClient++;
// 客戶端task (廣播發送端)
sc.gather([&](int idx)->CoTask {
AsyncBroadcastor broadcast(sc);
std::vector bufSent(260);
DWORD nbytesSent;
int total = 1;
while (1) {
auto randPort = portList[rand() % totalClientBroadcastCount];
std::snprintf(bufSent.data(), bufSent.size(),
"nHelle server, this is broadcastor %d: %dst randon broadcast to port=%d."
, idx, total, randPort);
logger.debug("nBroadcastor[%d] send wait ...", idx);
auto ret = co_await broadcast.sendTo("127.0.0.1", randPort,
bufSent.data(), std::strlen(bufSent.data()) + 1, &nbytesSent);
if (ret == SOCKET_ERROR || nbytesSent == 0) {
break;
}
logger.debug("nBroadcastor[%d] sendto server msg = %s", idx, bufSent.data());
total++;
totalsSendto++;
}
logger.debug("nBroadcastor[%d] send 0 bytes and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統計協程
sc.gather([&]()->CoTask {
auto last = totalsRecvd + totalsSendto;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000); // 協程休眠3000毫秒
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totalsRecvd + totalsSendto, time, (totalsRecvd + totalsSendto - last) / (time - lastTime));
last = totalsRecvd + totalsSendto;
lastTime = time;
}
});
sc.run();
}
#endif
-
函數
+關注
關注
3文章
4331瀏覽量
62618 -
C++
+關注
關注
22文章
2108瀏覽量
73651 -
源代碼
+關注
關注
96文章
2945瀏覽量
66747 -
生成器
+關注
關注
7文章
315瀏覽量
21011
發布評論請先 登錄
相關推薦
評論