原文链接:C++ IO多路复用 epoll模型
预备知识
epoll模型前置需要了解的可以参考:
IO控制:fcntl库:IO控制库
多线程:C++ Linux多线程同步通信-信号量
socket:C++ Linux多进程Socket通信
select模型:C++ IO多路复用 select模型
poll模型:C++ IO多路复用 poll模型
epoll模型
特性
原理
epoll是多路IO复用一种相比select和poll更高效的模型,
epoll底层一般是用红黑树和链表等结构实现的,使用红黑树对IO插入查找或删除, 当有活动事件时放到链表中
优点
通过红黑树和链表的结合,epoll 能够在高并发的场景下高效地管理IO. 处理大量连接优于select和poll
缺点
epoll数据结构更复杂,在连接数量少时,在维护epoll本身上的开销较大
库函数<sys/epoll.h>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| # include <sys/epoll.h> #include <errno.h>
int epoll_create(int size) size: 预先期望分配的io数,不是必须的,Linux 2.6.8后该参数没有作用 return >0(对应epoll实例的描述符fd) -1(错误)
// epoll实例创建 int epoll_create1(int flags) flags: 0无标志 EPOLL_CLOEXEC 标志新进程创建时会自动关闭该epoll的fd return >0(对应epoll实例的描述符fd) -1(错误)
epoll_create和epoll_create1都需要主动close,但是epoll_create1能控制在子进程中自动关闭
使用epoll_create1相比epoll_create更安全
// epoll实例管理 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); epfd: epoll_create返回的描述符 op: EPOLL_CTL_ADD(添加监听IO) EPOLL_CTL_MOD(修改监听IO) EPOLL_CTL_DEL(删除监听IO) fd: 对应的IO fd event: epoll_event结构体,保存事件和数据消息 return 0(成功) -1(错误)
struct epoll_event { __uint32_t events; epoll_data_t data; };
EPOLLIN 读事件 EPOLLPRI 紧急数据 EPOLLOUT 写事件 EPOLLHUP IO关闭 EPOLLET 边缘触发模式,事件状态变化时才会提醒,后续不提醒,但是可以后续处理 EPOLLONESHOT 更极端的模式,事件只会通知一次(需要ctl重新注册添加),当前epoll轮次未处理,下轮不会返回 EPOLLRDHUP IO半关闭,该IO不会再发送数据,但是还能给其写数据(TCP的半关闭状态) EPOLLERR IO错误
typedef union epoll_data { void* ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
用户数据在复杂的通信设计里面很有效,例如作为一个指针,指向存放该IO的相关信息如ID
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); epfd: create的fd events: 注册事件的数组 maxevents: 最大监听事件数量 timeout: -1(阻塞) 0(非阻塞) >0(时间上限) return >0(事件数) 0(超时) -1(错误)
|
实例: epoll实现一个并发测试服务器: 测试(IO复用+线程池)Reactor模型的性能
通信数据结构设计
在这种高并发响应场景中,消息的类型实际上会影响实际处理程序. 为了简化项目,我们统一数据大小16B,包含一个long型和double型数据.
测试端 开m个进程,每个线程池一直发送n次+1数据,最终得到两个和.m(1+n)n/2,服务器完成所有请求后销毁线程池,并验证结构是否正确.
1 2 3 4
| struct MessageBodyData{ long c1; double c2; }
|
线程池
参考 C++实现一个线程池
程序
感觉写的很垃圾,没体现epoll优势,只是在使用线程池,不得不说线程池是真好用.
- 很难在测试场景中发起大量的连接,测试也只用了一个连接.
- 出现过异常:get result:LongSum=1729343879082347468 DoubleSum=5.0015e+07(correct=50005000 real recv=10001, should recv=10000) 数据包多收了.
- 出现过异常:Connection reset by peer,说是并发量太大了,连接被主动断开了,但是后面测试不出来,不知道什么问题.
正常结果:
1 2 3
| send finish, error counts=0
get result:LongSum=50005000 DoubleSum=5.0005e+07(correct=50005000 real recv=10000, should recv=10000)
|
server.cpp: 使用线程池实现一个echo服务器,测试服务器响应能力和稳定性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
| #include<thread> #include<functional> #include<future> #include<condition_variable> #include<memory> #include<iostream> #include<mutex> #include<atomic> #include<vector> #include<queue> #include<type_traits> #include<cstring>
#include<sys/epoll.h> #include<sys/socket.h> #include<sys/socket.h> #include<arpa/inet.h> #include<climits> #include<unistd.h>
#include<chrono>
#define N_REQUESTS 10000 #define M_PROCESSES 1 #define MAX_CONNS 1 #define TOTAL_REQUESES N_REQUESTS*M_PROCESSES
struct MessageData{ long c1; double c2; };
long LongSum=0; double DoubleSum=0;
bool finish=false;
class ThreadPools{ public: ThreadPools(int n_threads): n_threads(n_threads),stop(false){ for(int i=0;i<n_threads;i++){ workers.emplace_back([this](){ while(true){ std::packaged_task<void()> task; { std::unique_lock<std::mutex> lock(this->mtx); this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();}); if(this->stop && this->tasks.empty()) return ; task=std::move(this->tasks.front()); this->tasks.pop(); task(); } } }); } } ~ThreadPools(){ { std::unique_lock<std::mutex> lock(mtx); stop=true; } cv.notify_all(); for(std::thread &worker : workers){ worker.join(); } } template<typename F> auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>; private: int n_threads; std::atomic<bool> stop; std::vector<std::thread> workers; std::queue<std::packaged_task<void()>> tasks; std::condition_variable cv; std::mutex mtx; };
template<typename F> auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{ using return_type=typename std::result_of<F()>::type; auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f)); std::future<return_type> res=task->get_future(); { std::unique_lock<std::mutex> lock(mtx); if(stop){ throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task](){(*task)();}); } cv.notify_one(); return res; }
int main(){ int epoll_fd=epoll_create1(EPOLL_CLOEXEC); if(epoll_fd==-1){ close(epoll_fd); perror("epoll_create1"); } std::string server_ip="127.0.0.1"; uint16_t server_port=8001; int server_fd=socket(AF_INET,SOCK_STREAM,0); if(server_fd==-1){ close(epoll_fd); close(server_fd); perror("socket");exit(EXIT_FAILURE); } struct sockaddr_in server_addr; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr); if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){ close(epoll_fd); close(server_fd); perror("bind");exit(EXIT_FAILURE); } if(listen(server_fd,10)==-1){ close(epoll_fd); close(server_fd); perror("listen");exit(EXIT_FAILURE); } ThreadPools tp(5); struct epoll_event epoll_fds[MAX_CONNS+1]; struct epoll_event ev; ev.events=EPOLLIN|EPOLLPRI; ev.data.fd=server_fd; if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_fd,&ev)==-1){ close(epoll_fd); close(server_fd); perror("epoll_ctl");exit(EXIT_FAILURE); } int cnt=0; auto start = std::chrono::high_resolution_clock::now(); bool wait=false; while(!finish){ if(wait&&std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-start).count()>10000){ perror("test out off 1000ms, terminating"); finish=true; } if(cnt>=TOTAL_REQUESES){ finish=true; } int nevents=epoll_wait(epoll_fd,epoll_fds,MAX_CONNS+1,0); for(int i=0;i<nevents;i++){ if(epoll_fds[i].data.fd==server_fd){ int new_fd=accept(server_fd,nullptr,nullptr); if(new_fd==-1){ perror("accept"); }else{ ev.events = EPOLLIN; ev.data.fd = new_fd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &ev); } }else{ struct MessageData msg; ssize_t size=recv(epoll_fds[i].data.fd,&msg,sizeof(msg),0); if(size<0){ perror("recv"); }else{ wait=true; cnt++; auto lam=std::bind([](struct MessageData msg){LongSum+=msg.c1;DoubleSum+=msg.c2; },msg); tp.enqueue(lam); } } } } close(epoll_fd); close(server_fd); std::cout<<"get result:"<<"LongSum="<<LongSum<<" DoubleSum="<<DoubleSum<< "(correct="<<M_PROCESSES*N_REQUESTS*(N_REQUESTS+1)/2<<" real recv="<<cnt<<", should recv="<<M_PROCESSES*N_REQUESTS<<")\n"; }
|
client.cpp: 客户端:20个线程循环发送10000条消息请求.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| #include<thread> #include<functional> #include<future> #include<condition_variable> #include<memory> #include<iostream> #include<mutex> #include<atomic> #include<vector> #include<queue> #include<type_traits>
#include<sys/socket.h> #include<arpa/inet.h> #include<climits> #include<unistd.h> #include<fcntl.h> #include<chrono>
#define N_REQUESTS 1000000 #define M_PROCESSES 1
struct MessageData{ long c1; double c2; };
class ThreadPools{ public: ThreadPools(int n_threads): n_threads(n_threads),stop(false){ for(int i=0;i<n_threads;i++){ workers.emplace_back([this](){ while(true){ std::packaged_task<void()> task; { std::unique_lock<std::mutex> lock(this->mtx); this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();}); if(this->stop && this->tasks.empty()) return ; task=std::move(this->tasks.front()); this->tasks.pop(); task(); } } }); } } ~ThreadPools(){ { std::unique_lock<std::mutex> lock(mtx); stop=true; } cv.notify_all(); for(std::thread &worker : workers){ worker.join(); } } template<typename F> auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>; private: int n_threads; std::atomic<bool> stop; std::vector<std::thread> workers; std::queue<std::packaged_task<void()>> tasks; std::condition_variable cv; std::mutex mtx; };
template<typename F> auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{ using return_type=typename std::result_of<F()>::type; auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f)); std::future<return_type> res=task->get_future(); { std::unique_lock<std::mutex> lock(mtx); if(stop){ throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task](){(*task)();}); } cv.notify_one(); return res; }
int main(){ std::string server_ip="127.0.0.1"; uint16_t server_port=8001; int server_fd=socket(AF_INET,SOCK_STREAM,0); if(server_fd==-1){ close(server_fd); perror("socket");exit(EXIT_FAILURE); } struct sockaddr_in server_addr; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr); if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("connect"); exit(EXIT_FAILURE); } int flags = fcntl(server_fd, F_GETFL, 0); fcntl(server_fd, F_SETFL, flags | O_NONBLOCK);
ThreadPools tp(20); int cnt=0; struct MessageData msg; for(int i=1;i<N_REQUESTS+1;i++){ msg.c1=(long)i;msg.c2=double(i); auto req=std::bind([](int i,int fd,int& cnt,struct MessageData msg){ ssize_t size=send(fd,&msg,sizeof(msg),0); if(size<=0) cnt++; },i,server_fd,std::ref(cnt),msg); tp.enqueue(req); } std::cout<<"send finish, error counts="<<cnt<<"\n"; }
|