在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

介紹reactor的四種模型

科技綠洲 ? 來源:Linux開發架構之路 ? 作者:Linux開發架構之路 ? 2023-11-08 15:29 ? 次閱讀

前言

本文將由淺入深的介紹reactor,深入淺出的封裝epoll,一步步變成reactor模型,并在文末介紹reactor的四種模型。

reactor是什么?

reactor是一種高并發服務器模型,是一種框架,一個概念,所以reactor沒有一個固定的代碼,可以有很多變種,后續會介紹到。

reactor中的IO使用的是select,poll,epoll這些IO多路復用,使用IO多路復用系統不必創建維護大量線程,只使用一個線程、一個選擇器就可同時處理成千上萬連接,大大減少了系統開銷。

reactor中文譯為反應堆,將epoll中的IO變成事件驅動,比如讀事件,寫事件。來了個讀事件,立馬進行反應,執行提前注冊好的事件回調函數。

回想一下普通函數調用的機制:程序調用某函數,函數執行,程序等待,函數將結果和控制權返回給程序,程序繼續處理。reactor反應堆,是一種事件驅動機制,和普通函數調用的不同之處在于:應用程序不是主動的調用某個 API 完成處理,而是恰恰相反,reactor逆置了事件處理流程,應用程序需要提供相應的接口并注冊到 reactor上,如果相應的事件發生,reactor將主動調用應用程序注冊的接口,這些接口又稱為“回調函數”。

說白了,reactor就是對epoll進行封裝,進行網絡IO與業務的解耦,將epoll管理IO變成管理事件,整個程序由事件進行驅動執行。就像下圖一樣,有就緒事件返回,reactor:由事件驅動執行對應的回調函數;epoll:需要自己判斷。

圖片

reactor模型三個重要組件與流程分析

reactor是處理并發 I/O 比較常見的一種模式,用于同步 I/O,中心思想是將所有要處理的 I/O 事件注冊到一個中心 I/O 多路復用器(epoll)上,同時主線程/進程阻塞在多路復用器上;一旦有 I/O 事件到來或是準備就緒(文件描述符或 socket 可讀、寫),多路復用器返回并將事先注冊的相應 I/O 事件分發到對應的處理器中。

組件

reactor模型有三個重要的組件

多路復用器:由操作系統提供,在 linux 上一般是 select, poll, epoll 等系統調用。

圖片

事件分發器:將多路復用器中返回的就緒事件分到對應的處理函數中。

圖片

事件處理器:負責處理特定事件的處理函數。

圖片

流程

具體流程:

  1. 注冊相應的事件處理器(剛開始listenfd注冊都就緒事件)
  2. 多路復用器等待事件
  3. 事件到來,激活事件分發器,分發器調用事件到對應的處理器
  4. 事件處理器處理事件,然后注冊新的事件(比如讀事件,完成讀操作后,根據業務處理數據,注冊寫事件,寫事件根據業務響應請求;比如listen讀事件,肯定要給新的連接注冊讀事件)

將epoll封裝成reactor事件驅動

封裝每一個連接sockfd變成ntyevent

我們知道一個連接對應一個文件描述符fd,對于這個連接(fd)來說,它有自己的事件(讀,寫)。我們將fd都設置成非阻塞的,所以這里我們需要添加兩個buffer,至于大小就是看業務需求了。

struct ntyevent {
int fd;//socket fd
int events;//事件

char sbuffer[BUFFER_LENGTH];//寫緩沖buffer
int slength;
char rbuffer[BUFFER_LENGTH];//讀緩沖buffer
int rlength;
// typedef int (*NtyCallBack)(int, int, void *);
NtyCallBack callback;//回調函數

void *arg;
int status;//1MOD 0 null
};

封裝epfd和ntyevent變成ntyreactor

我們知道socket fd已經被封裝成了ntyevent,那么有多少個ntyevent呢?這里demo初始化reactor的時候其實是將*events指向了一個1024的ntyevent數組(按照道理來說客戶端連接可以一直連,不止1024個客戶端,后續文章有解決方案,這里從簡)。epfd肯定要封裝進行,不用多說。

struct ntyreactor {
int epfd;
struct ntyevent *events;
//struct ntyevent events[1024];
};

封裝讀、寫、接收連接等事件對應的操作變成callback

前面已經說了,把事件寫成回調函數,這里的參數fd肯定要知道自己的哪個連接,events是什么事件的意思,arg傳的是ntyreactor (考慮到后續多線程多進程,如果將ntyreactor設為全局感覺不太好 )

typedef int (*NtyCallBack)(int, int, void *);

int recv_cb(int fd, int events, void *arg);

int send_cb(int fd, int events, void *arg);

int accept_cb(int fd, int events, void *arg);

給每個客戶端的ntyevent設置屬性

具兩個例子,我們知道第一個socket一定是listenfd,用來監聽用的,那么首先肯定是設置ntyevent的各項屬性。 本來是讀事件,讀完后要改成寫事件,那么必然要把原來的讀回調函數設置成寫事件回調。

void nty_event_set(struct ntyevent *ev, int fd, NtyCallBack callback, void *arg) {
ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
}

將ntyevent加入到epoll中由內核監聽

int nty_event_add(int epfd, int events, struct ntyevent *ntyev) {
struct epoll_event ev = {0, {0}};
ev.data.ptr = ntyev;
ev.events = ntyev->events = events;
int op;
if (ntyev->status == 1) {
op = EPOLL_CTL_MOD;
}
else {
op = EPOLL_CTL_ADD;
ntyev->status = 1;
}

if (epoll_ctl(epfd, op, ntyev->fd, &ev) < 0) {
printf("event add failed [fd=%d], events[%d],err:%s,err:%dn", ntyev->fd, events, strerror(errno), errno);
return -1;
}
return 0;
}

將ntyevent從epoll中去除

int nty_event_del(int epfd, struct ntyevent *ev) {
struct epoll_event ep_ev = {0, {0}};
if (ev->status != 1) {
return -1;
}
ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
//epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);
return 0;
}

讀事件回調函數

這里就是被觸發的回調函數,具體代碼要與業務結合,這里的參考意義不大(這里就是讀一次,改成寫事件)

int recv_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
struct ntyevent *ntyev = &reactor->events[fd];

int len = recv(fd, ntyev->buffer, BUFFER_LENGTH, 0);
nty_event_del(reactor->epfd, ntyev);
if (len > 0) {
ntyev->length = len;
ntyev->buffer[len] = '?';
printf("C[%d]:%sn", fd, ntyev->buffer);
nty_event_set(ntyev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ntyev);
}
else if (len == 0) {
close(ntyev->fd);
printf("[fd=%d] pos[%ld], closedn", fd, ntyev - reactor->events);
}
else {
close(ntyev->fd);
printf("recv[fd=%d] error[%d]:%sn", fd, errno, strerror(errno));
}
return len;
}

寫事件回調函數

這里就是被觸發的回調函數,具體代碼要與業務結合,這里的參考意義不大(將讀事件讀的數據寫回,再改成讀事件,相當于echo)

int send_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
struct ntyevent *ntyev = &reactor->events[fd];

int len = send(fd, ntyev->buffer, ntyev->length, 0);
if (len > 0) {
printf("send[fd=%d], [%d]%sn", fd, len, ntyev->buffer);
nty_event_del(reactor->epfd, ntyev);
nty_event_set(ntyev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ntyev);
}
else {
close(ntyev->fd);
nty_event_del(reactor->epfd, ntyev);
printf("send[fd=%d] error %sn", fd, strerror(errno));
}
return len;
}

接受新連接事件回調函數

本質上就是accept,然后將其加入到epoll監聽

int accept_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
if (reactor == NULL) return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;

if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
printf("accept: %sn", strerror(errno));
return -1;
}
printf("client fd = %dn", clientfd);
if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %dn", __func__, MAX_EPOLL_EVENTS);
return -1;
}
nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);

printf("new connect [%s:%d][time:%ld], pos[%d]n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
reactor->events[clientfd].last_active, clientfd);

return 0;
}

reactor運行

就是將原來的epoll_wait從main函數中封裝到ntyreactor_run函數中

int ntyreactor_run(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->events == NULL) return -1;

struct epoll_event events[MAX_EPOLL_EVENTS];
int checkpos = 0, i;

while (1) {
int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0) {
printf("epoll_wait error, exitn");
continue;
}
for (i = 0; i < nready; i++) {
struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}

reactor簡單版代碼與測試

#include
#include
#include
#include
#include
#include
#include
#include
#include
#include


#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8082

typedef int (*NtyCallBack)(int, int, void *);

struct ntyevent {
int fd;
int events;
void *arg;

NtyCallBack callback;

int status;//1MOD 0 null
char buffer[BUFFER_LENGTH];
int length;
long last_active;
};


struct ntyreactor {
int epfd;
struct ntyevent *events;
};


int recv_cb(int fd, int events, void *arg);

int send_cb(int fd, int events, void *arg);

int accept_cb(int fd, int events, void *arg);

void nty_event_set(struct ntyevent *ev, int fd, NtyCallBack callback, void *arg) {
ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
ev->last_active = time(NULL);
}

int nty_event_add(int epfd, int events, struct ntyevent *ntyev) {
struct epoll_event ev = {0, {0}};
ev.data.ptr = ntyev;
ev.events = ntyev->events = events;
int op;
if (ntyev->status == 1) {
op = EPOLL_CTL_MOD;
}
else {
op = EPOLL_CTL_ADD;
ntyev->status = 1;
}

if (epoll_ctl(epfd, op, ntyev->fd, &ev) < 0) {
printf("event add failed [fd=%d], events[%d],err:%s,err:%dn", ntyev->fd, events, strerror(errno), errno);
return -1;
}
return 0;
}

int nty_event_del(int epfd, struct ntyevent *ev) {
struct epoll_event ep_ev = {0, {0}};
if (ev->status != 1) {
return -1;
}
ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
//epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);
return 0;
}

int recv_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
struct ntyevent *ntyev = &reactor->events[fd];

int len = recv(fd, ntyev->buffer, BUFFER_LENGTH, 0);
nty_event_del(reactor->epfd, ntyev);
if (len > 0) {
ntyev->length = len;
ntyev->buffer[len] = '?';
printf("C[%d]:%sn", fd, ntyev->buffer);
nty_event_set(ntyev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ntyev);
}
else if (len == 0) {
close(ntyev->fd);
printf("[fd=%d] pos[%ld], closedn", fd, ntyev - reactor->events);
}
else {
close(ntyev->fd);
printf("recv[fd=%d] error[%d]:%sn", fd, errno, strerror(errno));
}
return len;
}

int send_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
struct ntyevent *ntyev = &reactor->events[fd];

int len = send(fd, ntyev->buffer, ntyev->length, 0);
if (len > 0) {
printf("send[fd=%d], [%d]%sn", fd, len, ntyev->buffer);
nty_event_del(reactor->epfd, ntyev);
nty_event_set(ntyev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ntyev);
}
else {
close(ntyev->fd);
nty_event_del(reactor->epfd, ntyev);
printf("send[fd=%d] error %sn", fd, strerror(errno));
}
return len;
}

int accept_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
if (reactor == NULL) return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;

if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
printf("accept: %sn", strerror(errno));
return -1;
}
printf("client fd = %dn", clientfd);
if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %dn", __func__, MAX_EPOLL_EVENTS);
return -1;
}
nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);

printf("new connect [%s:%d][time:%ld], pos[%d]n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
reactor->events[clientfd].last_active, clientfd);

return 0;
}

int init_sock(short port) {
int fd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);

bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr));

if (listen(fd, 20) < 0) {
printf("listen failed : %sn", strerror(errno));
}
return fd;
}


int ntyreactor_init(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
memset(reactor, 0, sizeof(struct ntyreactor));
reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0) {
printf("create epfd in %s err %sn", __func__, strerror(errno));
return -2;
}
reactor->events = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
memset(reactor->events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

if (reactor->events == NULL) {
printf("create epll events in %s err %sn", __func__, strerror(errno));
close(reactor->epfd);
return -3;
}
return 0;
}

int ntyreactor_destory(struct ntyreactor *reactor) {
close(reactor->epfd);
free(reactor->events);
}

int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NtyCallBack acceptor) {
if (reactor == NULL) return -1;
if (reactor->events == NULL) return -1;
nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
return 0;
}

_Noreturn int ntyreactor_run(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->events == NULL) return -1;

struct epoll_event events[MAX_EPOLL_EVENTS];
int checkpos = 0, i;

while (1) {
//心跳包 60s 超時則斷開連接
long now = time(NULL);
for (i = 0; i < 100; i++, checkpos++) {
if (checkpos == MAX_EPOLL_EVENTS) {
checkpos = 0;
}
if (reactor->events[checkpos].status != 1 || checkpos == 3) {
continue;
}
long duration = now - reactor->events[checkpos].last_active;

if (duration >= 60) {
close(reactor->events[checkpos].fd);
printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);
nty_event_del(reactor->epfd, &reactor->events[checkpos]);
}
}


int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0) {
printf("epoll_wait error, exitn");
continue;
}
for (i = 0; i < nready; i++) {
struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}

int main(int argc, char *argv[]) {
int sockfd = init_sock(SERVER_PORT);

struct ntyreactor *reactor = (struct ntyreactor *) malloc(sizeof(struct ntyreactor));
if (ntyreactor_init(reactor) != 0) {
return -1;
}

ntyreactor_addlistener(reactor, sockfd, accept_cb);
ntyreactor_run(reactor);

ntyreactor_destory(reactor);
close(sockfd);
return 0;
}

圖片

reactor優點

reactor模式是編寫高性能網絡服務器的必備技術之一,它具有如下優點:

  • 響應快,不必為單個同步時間所阻塞,雖然 reactor本身依然是同步的
  • 編程相對簡單,可以最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷
  • 可擴展性,可以方便的通過增加 reactor實例個數來充分利用 CPU 資源
  • 可復用性,reactor 框架本身與具體事件處理邏輯無關,具有很高的復用性

reactor模型開發效率上比起直接使用 IO 復用要高,它通常是單線程的,設計目標是希望單線程使用一顆 CPU 的全部資源,但也有附帶優點,即每個事件處理中很多時候可以不考慮共享資源的互斥訪問。可是缺點也是明顯的,現在的硬件發展,已經不再遵循摩爾定律,CPU 的頻率受制于材料的限制不再有大的提升,而改為是從核數的增加上提升能力,當程序需要使用多核資源時,reactor模型就會悲劇。

如果程序業務很簡單,例如只是簡單的訪問一些提供了并發訪問的服務,就可以直接開啟多個反應堆,每個反應堆對應一顆 CPU 核心,這些反應堆上跑的請求互不相關,這是完全可以利用多核的。例如 Nginx 這樣的 http 靜態服務器。

reactor多種模型

單reactor + 單線程模型

單reactor單線程模型,指的是所有的 IO 操作(讀,寫,建立連接)都在同一個線程上面完成

圖片

缺點:

  1. 由于只有一個線程,因此事件是順序處理的,一個線程同時只能做一件事情,事件的優先級得不到保證
  2. 不能充分利用多核CPU

單reactor + 線程池(Thread Pool)模型

相比于單reactor單線程模型,此模型中收到請求后,不在reactor線程計算,而是使用線程池來計算,這會充分的利用多核CPU。采用此模式時有可能存在多個線程同時計算同一個連接上的多個請求,算出的結果的次序是不確定的, 所以需要網絡框架在設計協議時帶一個id標示,以便以便讓客戶端區分response對應的是哪個request。

圖片

多reactor + 多線程模型

此模式的特點是每個線程一個循環, 有一個main reactor負責accept連接, 然后把該連接掛在某個sub reactor中,這樣該連接的所有操作都在那個sub reactor所處的線程中完成。多個連接可能被分配到多個線程中,充分利用CPU。在應用場景中,reactor的個數可以采用 固定的個數,比如跟CPU數目一致。此模型與單reactor多線程模型相比,減少了進出thread pool兩次上下文切換,小規模的計算可以在當前IO線程完成并且返回結果,降低響應的延遲。并可以有效防止當IO壓力過大時一個reactor處理能力飽和問題。

圖片

多reactor + 線程池(Thread Pool)模型

此模型是上面兩個的混合體,它既使用多個 reactors 來處理 IO,又使用線程池來處理計算。此模式適適合既有突發IO(利用Multiple Reactor分擔),又有突發計算的應用(利用線程池把一個連接上的計算任務分配給多個線程)。

圖片

注意點

注意:前面介紹的四種reactor 模式在具體實現時為了簡應該遵循的原則是:每個文件描述符只由一個線程操作。這樣可以輕輕松松解決消息收發的順序性問題,也避免了關閉文件描述符的各種race condition。一個線程可以操作多個文件描述符,但是一個線程不能操作別的線程擁有的文件描述符。這一點不難做到。epoll也遵循了相同的原則。Linux文檔中并沒有說明,當一個線程證阻塞在epoll_wait時,另一個線程往epoll fd添加一個新的監控fd會發生什么。新fd上的事件會不會在此次epoll_wait調用中返回?為了穩妥起見,我們應該吧對同一個 epoll fd的操作(添加、刪除、修改等等)都放到同一個線程中執行。

reactor完善版代碼

由于fd的數量未知,這里設計ntyreactor 里面包含 eventblock ,eventblock 包含1024個fd。每個fd通過 fd/1024定位到在第幾個eventblock,通過fd%1024定位到在eventblock第幾個位置。

圖片

#include
#include
#include
#include
#include
#include
#include
#include
#include

#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8081
#define PORT_COUNT 100

typedef int (*NCALLBACK)(int, int, void *);

struct ntyevent {
int fd;
int events;
void *arg;

NCALLBACK callback;

int status;
char buffer[BUFFER_LENGTH];
int length;
};
struct eventblock {
struct eventblock *next;
struct ntyevent *events;
};

struct ntyreactor {
int epfd;
int blkcnt;
struct eventblock *evblk;
};


int recv_cb(int fd, int events, void *arg);

int send_cb(int fd, int events, void *arg);

struct ntyevent *ntyreactor_find_event_idx(struct ntyreactor *reactor, int sockfd);

void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK *callback, void *arg) {
ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
}

int nty_event_add(int epfd, int events, struct ntyevent *ev) {
struct epoll_event ep_ev = {0, {0}};
ep_ev.data.ptr = ev;
ep_ev.events = ev->events = events;
int op;
if (ev->status == 1) {
op = EPOLL_CTL_MOD;
}
else {
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
printf("event add failed [fd=%d], events[%d]n", ev->fd, events);
return -1;
}
return 0;
}

int nty_event_del(int epfd, struct ntyevent *ev) {
struct epoll_event ep_ev = {0, {0}};
if (ev->status != 1) {
return -1;
}
ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
return 0;
}

int recv_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
struct ntyevent *ev = ntyreactor_find_event_idx(reactor, fd);
int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0); //
nty_event_del(reactor->epfd, ev);

if (len > 0) {
ev->length = len;
ev->buffer[len] = '?';
// printf("recv[%d]:%sn", fd, ev->buffer);
printf("recv fd=[%dn", fd);

nty_event_set(ev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ev);
}
else if (len == 0) {
close(ev->fd);
//printf("[fd=%d] pos[%ld], closedn", fd, ev-reactor->events);
}
else {
close(ev->fd);
// printf("recv[fd=%d] error[%d]:%sn", fd, errno, strerror(errno));
}
return len;
}


int send_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor *) arg;
struct ntyevent *ev = ntyreactor_find_event_idx(reactor, fd);

int len = send(fd, ev->buffer, ev->length, 0);
if (len > 0) {
// printf("send[fd=%d], [%d]%sn", fd, len, ev->buffer);
printf("send fd=[%dn]", fd);

nty_event_del(reactor->epfd, ev);
nty_event_set(ev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ev);
}
else {
nty_event_del(reactor->epfd, ev);
close(ev->fd);
printf("send[fd=%d] error %sn", fd, strerror(errno));
}
return len;
}

int accept_cb(int fd, int events, void *arg) {//非阻塞
struct ntyreactor *reactor = (struct ntyreactor *) arg;
if (reactor == NULL) return -1;

struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);

int clientfd;
if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
printf("accept: %sn", strerror(errno));
return -1;
}
if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %dn", __func__, MAX_EPOLL_EVENTS);
return -1;
}
struct ntyevent *event = ntyreactor_find_event_idx(reactor, clientfd);

nty_event_set(event, clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, event);

printf("new connect [%s:%d], pos[%d]n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
return 0;
}

int init_sock(short port) {
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);

bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr));

if (listen(fd, 20) < 0) {
printf("listen failed : %sn", strerror(errno));
}
return fd;
}


int ntyreactor_alloc(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->evblk == NULL) return -1;

struct eventblock *blk = reactor->evblk;
while (blk->next != NULL) {
blk = blk->next;
}

struct ntyevent *evs = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("ntyreactor_alloc ntyevents failedn");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

struct eventblock *block = (struct eventblock *) malloc(sizeof(struct eventblock));
if (block == NULL) {
printf("ntyreactor_alloc eventblock failedn");
return -2;
}
memset(block, 0, sizeof(struct eventblock));

block->events = evs;
block->next = NULL;

blk->next = block;
reactor->blkcnt++; //
return 0;
}

struct ntyevent *ntyreactor_find_event_idx(struct ntyreactor *reactor, int sockfd) {
int blkidx = sockfd / MAX_EPOLL_EVENTS;

while (blkidx >= reactor->blkcnt) {
ntyreactor_alloc(reactor);
}
int i = 0;
struct eventblock *blk = reactor->evblk;
while (i++ < blkidx && blk != NULL) {
blk = blk->next;
}
return &blk->events[sockfd % MAX_EPOLL_EVENTS];
}


int ntyreactor_init(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
memset(reactor, 0, sizeof(struct ntyreactor));

reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0) {
printf("create epfd in %s err %sn", __func__, strerror(errno));
return -2;
}

struct ntyevent *evs = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("ntyreactor_alloc ntyevents failedn");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

struct eventblock *block = (struct eventblock *) malloc(sizeof(struct eventblock));
if (block == NULL) {
printf("ntyreactor_alloc eventblock failedn");
return -2;
}
memset(block, 0, sizeof(struct eventblock));

block->events = evs;
block->next = NULL;

reactor->evblk = block;
reactor->blkcnt = 1;
return 0;
}

int ntyreactor_destory(struct ntyreactor *reactor) {
close(reactor->epfd);
//free(reactor->events);

struct eventblock *blk = reactor->evblk;
struct eventblock *blk_next = NULL;

while (blk != NULL) {
blk_next = blk->next;
free(blk->events);
free(blk);
blk = blk_next;
}
return 0;
}


int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {
if (reactor == NULL) return -1;
if (reactor->evblk == NULL) return -1;

struct ntyevent *event = ntyreactor_find_event_idx(reactor, sockfd);

nty_event_set(event, sockfd, acceptor, reactor);
nty_event_add(reactor->epfd, EPOLLIN, event);
return 0;
}


_Noreturn int ntyreactor_run(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->evblk == NULL) return -1;

struct epoll_event events[MAX_EPOLL_EVENTS + 1];

int i;

while (1) {
int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0) {
printf("epoll_wait error, exitn");
continue;
}
for (i = 0; i < nready; i++) {
struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}


//
int main(int argc, char *argv[]) {
unsigned short port = SERVER_PORT; // listen 8081
if (argc == 2) {
port = atoi(argv[1]);
}
struct ntyreactor *reactor = (struct ntyreactor *) malloc(sizeof(struct ntyreactor));
ntyreactor_init(reactor);
int i = 0;
int sockfds[PORT_COUNT] = {0};
for (i = 0; i < PORT_COUNT; i++) {
sockfds[i] = init_sock(port + i);
ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
}
ntyreactor_run(reactor);
ntyreactor_destory(reactor);
for (i = 0; i < PORT_COUNT; i++) {
close(sockfds[i]);
}
free(reactor);
return 0;
},>
聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 封裝
    +關注

    關注

    127

    文章

    7938

    瀏覽量

    143089
  • 服務器
    +關注

    關注

    12

    文章

    9225

    瀏覽量

    85616
  • 模型
    +關注

    關注

    1

    文章

    3259

    瀏覽量

    48907
  • 代碼
    +關注

    關注

    30

    文章

    4798

    瀏覽量

    68728
收藏 人收藏

    評論

    相關推薦

    四種模擬輸入信號的保護電路實現方法

    本文介紹四種模擬輸入信號的保護電路的實現方法。
    發表于 03-28 09:55 ?1228次閱讀

    FPGA 設計的四種常用思想與技巧

    FPGA 設計的四種常用思想與技巧FPGA設計的四種常用思想與技巧 討論的四種常用FPGA/CPLD設計思想與技巧:乒乓操作、串并轉換、流水線操作、數據接口同步化,都是FPGA/CPLD 邏輯設計
    發表于 08-11 10:30

    四種無線充電技術簡單原理

    詳細介紹了電場耦合 電磁感應 磁共振無線電波 這四種方式
    發表于 07-28 11:12

    四種不同供電模式的LED拓撲介紹

    本文中,小編將為大家介紹四種在LED供電當中經常使用的四種拓撲結構。感興趣的朋友快來看一看吧。 首先需要從了解轉換器的最小及最大輸出電壓入手。這只是將所有LED正向壓降與傳感電阻器電壓相加的總數
    發表于 10-10 15:07

    介紹UPS電源的四種工作方式

    UPS電源是較為常見的應急電源系統,其在市電正常與市電異常的情況下,工作方式也有所不同,以下介紹UPS電源的四種工作方式:正常運行、電池工作、旁路運行和旁路維護。1、正常運行方式 UPS電源系統
    發表于 11-16 06:19

    介紹AUTOSAR支持的四種功能安全機制

    1、AUTOSAR的四種功能安全機制雖然AUTOSAR不是一個完整的安全解決方案,但它提供了一些安全機制用于支持安全關鍵系統的開發。本文用于介紹AUTOSAR支持的四種功能安全機制:內存分區
    發表于 06-10 17:33

    四種典型瞬態介紹

    ,我將介紹應該注意的幾種典型瞬態,以及TI如何幫助滿足瞬態保護需求。 瀏覽此文章,并查看參考設計:《汽車瞬態和過流保護濾波器參考設計》 典型瞬態在四種常見場景中可能會發生瞬變。圖1所示為第一場景
    發表于 11-07 08:02

    云計算的三服務模式和四種部署模型

    云計算基于3特殊的云計算服務模式,具體架構包括:基礎設施即服務、軟件即服務、平臺即服務。四種部署模型:公有云、私有云、社區云、混合云。
    發表于 01-31 15:10 ?3106次閱讀

    無線充電技術的四種方式及其原理和應用介紹

    本文介紹了無線充電技術的應用范圍及其電磁感應方式等四種充電方式的詳細介紹
    發表于 10-12 16:16 ?27次下載
    無線充電技術的<b class='flag-5'>四種</b>方式及其原理和應用<b class='flag-5'>介紹</b>

    jquery四種選擇器介紹

      本文給大家匯總介紹了jQuery的四種選擇器的使用方法以及示例,非常的簡單實用,希望對大家學習jquery能夠有所幫助。
    發表于 12-01 16:40 ?3044次閱讀
    jquery<b class='flag-5'>四種</b>選擇器<b class='flag-5'>介紹</b>

    四種溫度傳感器的數據介紹

    本文檔的主要內容詳細介紹的是四種溫度傳感器的數據介紹包括了:  球形傳感器,卡箍式傳感器,地面傳感器,侵入式傳感器
    發表于 02-28 17:09 ?9次下載

    四種常見的圖像濾波算法介紹

    作者丨一支程序媛@知乎 來源丨https://zhuanlan.zhihu.com/p/279602383 編輯丨極市平臺 導讀 圖像濾波是一非常重要的圖像處理技術,本文詳細介紹四種常見的圖像
    的頭像 發表于 02-15 09:50 ?1w次閱讀

    四種方式實現led點亮

    四種方式實現led點亮
    發表于 01-04 14:31 ?4次下載

    NoSQL數據庫的四種類型

    在本文中,我們將簡要介紹NoSQL數據庫的四種類型。
    的頭像 發表于 04-25 17:21 ?4460次閱讀

    介紹MCUboot支持的四種升級模式(2)

    介紹MCUboot支持的四種升級模式,分別是Overwrite、Swap、Direct XIP和加載到RAM中執行。由于FSP不支持第四種——加載到RAM中執行,因為我們重點介紹前三
    的頭像 發表于 06-13 10:56 ?965次閱讀
    <b class='flag-5'>介紹</b>MCUboot支持的<b class='flag-5'>四種</b>升級模式(2)
    主站蜘蛛池模板: 特黄毛片| 色婷婷亚洲精品综合影院| 国产精品午夜高清在线观看| 欧美7777kkkk免费看258| 日干夜干天天干| 久久亚洲国产午夜精品理论片 | 精品国产污网站在线观看15| 久久综合狠狠综合久久综合88| 久久综合免费视频| 国产拍拍视频| 欧美大片xxxxbbbb| 黄色福利网| www.淫.com| 天天摸夜夜爽| 亚洲国产午夜看片| 天天射天天爽| 天天综合久久| 狼人狠狠干| 西西人体www303sw大胆高清| 中文字幕在线播放一区| 国色天香精品亚洲精品| 亚洲一区免费在线| 国产精品久久久久久免费播放| 黄 在线| 1024手机看片国产旧版你懂的 | 高清一级做a爱视频免费| 一级片影院| 国产精品嫩草影院人体模特| 一级毛片在线不卡直接观看| 99热精品久久只有精品30| 色婷婷综合网| 日韩欧美亚洲综合一区二区| 免费激情网站| 666精品国产精品亚洲| 欧美福利二区| 亚洲性视频网站| 性欧美视频videos6一9| 免费看很黄很色裸乳视频| 91色在线视频| 午夜国产理论| 天天干夜夜添|