Linux知识点 – 高级IO(二)
文章目录
一、IO多路转接 – poll
1.poll接口
参数:
- fds:传入的struct pollfd结构体的起始地址;
- nfds:传入的结构体的个数;
前两个参数代表所有传入的struct pollfd结构体; - timeout:为0,表示非阻塞方式等待;为-1,表示阻塞等待;大于0 的值,表示等待n毫秒后返回;
- 返回值:大于0,表示有几个文件描述符就绪;等于0,超时timeout;小于0,poll失败;
poll的输入输出参数是分离的,因为struct pollfd结构体的内部成员有很多,可以完成不同的功能;
- fd:文件描述符,一旦设置好,调用和返回时,都不会变;
- events:用户通知内核,需要帮助我关心这个fd的哪些事件;
- revents:内核通知用户,这个fd的哪些事件已经就绪了;
- events和revents都是使用标记位传参,如下图;
2.poll实现
poll和select对fd的处理过程相近,只是具体对多个fd的管理方式有所不同;
Log.hpp
同select;
Sock.hpp
同select;
main.cc
#include "pollServer.hpp" #include<memory> int main() {
// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的! // std::cout << sizeof(fd_set) * 8 << std::endl; std::unique_ptr<PollServer> svr(new PollServer); svr->Start(); return 0; }
pollServer.hpp
#ifndef __POLL_SVR_H__ #define __POLL_SVR_H__ #include <iostream> #include <string> #include <vector> #include <poll.h> #include <sys/time.h> #include "Log.hpp" #include "Sock.hpp" #define FD_NONE -1 // 文件描述符初始化状态 using namespace std; class PollServer {
public: static const int nfds = 100; public: PollServer(const uint16_t &port = 8080) : _port(port) , _nfds(nfds) {
_listensock = Sock::Socket(); Sock::Bind(_listensock, _port); Sock::Listen(_listensock); logMessage(DEBUG, "%s", "create base socket success"); _fds = new struct pollfd[_nfds]; for(int i = 0; i < _nfds; i++) {
//初始化所有的struct pollfd _fds[i].fd = FD_NONE; _fds[i].events = _fds[i].revents = 0; } // 将listensock加入poll关心 _fds[0].fd = _listensock; _fds[0].events = POLLIN; _timeout = 1000; } void Start() {
while (true) {
int n = poll(_fds, _nfds, _timeout); switch (n) {
case 0: logMessage(DEBUG, "timeout"); break; case -1: logMessage(WARNING, "poll error: %d : %s", errno, strerror(errno)); break; default: // select成功 logMessage(DEBUG, "get a new link event"); HandlerEvent(); // 对就绪的fd进行处理 break; } } } ~PollServer() {
if (_listensock >= 0) {
close(_listensock); } if(_fds) {
delete[] _fds; } } private: void HandlerEvent() {
for (int i = 0; i < _nfds; i++) {
// 1.去掉不合法fd if (_fds[i].fd == FD_NONE) {
continue; } // 2.合法fd也不一定就绪了 if (_fds[i].revents & POLLIN) {
// 指定的fd,读事件就绪 // 读事件就绪:连接事件到来,accept if (_fds[i].fd == _listensock) {
Accepter(); // listensock需要进行accept } else {
Recver(i); // 普通sock进行recv } } } } void Accepter() {
string clientip; uint16_t clientport = 0; // listensock上面的读事件就绪了,表示可以读取了 // 获取新连接了 int sock = Sock::Accept(_listensock, &clientip, &clientport); // 在这里进行accept是不会阻塞的 if (sock < 0) {
logMessage(WARNING, "%s", "accept error"); return; } logMessage(DEBUG, "get a new link success : [%s:%d] : %d", clientip.c_str(), clientport, sock); int pos = 1; for (; pos < _nfds; pos++) {
if (_fds[pos].fd == FD_NONE) // 找出未设置合法fd的位置 {
break; } } if (pos == _nfds) // 数组满了 {
logMessage(WARNING, "%s:%d", "poll server already full,close: %d", sock); close(sock); } else {
_fds[pos].fd = sock; _fds[pos].events = POLLIN; } } void Recver(int pos) {
// 读事件就绪:INPUT事件到来,recv,read logMessage(DEBUG, "message in, get IO event: %d", _fds[pos].fd); // 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞 // 这样读取有bug吗?有的,你怎么保证以读到了一个完整报文呢? char buffer[1024]; int n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0); if (n > 0) {
buffer[n] = 0; logMessage(DEBUG, "client[%d]# %s", _fds[pos].fd, buffer); } else if (n == 0) // 对端关闭连接 {
logMessage(DEBUG, "client[%d] quit, me too...", _fds[pos].fd); // 1.我们也要关闭不需要的fd close(_fds[pos].fd); // 2.不要让select帮我关心当前的fd了 _fds[pos].fd = FD_NONE; _fds[pos].events = 0; } else {
logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno)); // 1.我们也要关闭不需要的fd close(_fds[pos].fd); // 2.不要让select帮我关心当前的fd了 _fds[pos].fd = FD_NONE; _fds[pos].events = 0; } } // void DebugPrint() // {
// cout << "_fd_array[]: "; // for (int i = 0; i < _nfds; i++) // {
// if (_fds[pos].fd == FD_NONE) // continue; // cout << _fds[pos].fd << " "; // } // cout << endl; // } private: uint16_t _port; int _listensock; struct pollfd *_fds; int _nfds; int _timeout; }; #endif
运行结果:
3.poll优缺点
优点:
- 效率高;
- 有大量的连接,但是只有少量的是活跃的,节省资源;
- 输入输出参数分离的,不需要进行大量的重置;
- poll参数级别,没有可以管理的fd的上限;
poll缺点:
5. poll依旧需要不少的遍历,在用户层检测时间就绪,与内核检测fd就绪,都是一样,用户还是要维护数组;
6. pol需要内核到用户的拷贝 – 少不了的;
7. poll的代码也比较复杂 – 比select容易;
二、IO多路转接 – epoll
1.epoll接口
epoll有三个接口:
epoll_create
- 创建epoll模型;
- size参数现在多半是废弃的,一般写成512或256;
- 返回值是一个文件描述符;
epoll_ctl
- epfd:epoll_create返回的fd;
- op:对这个epoll模型进行什么操作(增、删、改);
- fd:需要关心的文件描述符;
- event:关心该fd的什么事件;
event是struct epoll_event*类型的,其实是一个struct epoll_event类型的数组,每个struct epoll_event结构体里面都存储对应fd的信息和事件类型;
struct epoll_event结构体中的events成员可以是以下几个宏的集合:
struct epoll_event结构体中的data成员是epoll_data_t类型的联合体,可以储存fd的信息:
- 返回值:返回0表示调用成功;返回-1表示调用失败;
epoll_wait
- 在epoll模型中获取已经就绪的事件,timeout参数与poll是一样的;
- epfd:epoll_create返回的fd;
- events:分配好的epoll_events结构体数组;
- maxevents:events数组有多大;
- 返回值:为0,表示timeout;为-1,表示wait失败;大于0,表示有几个关心的文件描述符的事件就绪了;
2.epoll的工作原理
回想select和poll的工作流程:
- 无论是select还是poll,都是需要用户自己维护一个数组来进行保存fd,与特定的事件的;
- select or poll都要遍历;
- select or poll工作模式
a.通过select or poll,用户告诉内核,你要帮我关心哪些fd上的哪些event;
b.通过select or poll返回,内核告诉用户,哪些fd上的哪些event已经发生了;
操作系统是通过什么方式得知网卡里面有数据的? – 硬件中断;网卡数据就绪后,会触发硬件中断来通知OS取数据;
epoll工作原理
- 调用epoll_create接口创建一个epoll模型,OS会为用户维护一个红黑树结构;
红黑树节点:用户告诉内核,需要关心哪些fd的哪些事件,等价于poll所维护的数组; - 在epoll中,对于每一个事件,都会建立一个epitem结构体:
- OS还会维护一个就绪队列,用于通知用户哪些事件已经就绪;
当某个fd上的某个事件就绪了,OS会在就绪队列上生成一个节点; - OS可以设定一个回调函数,可以被注册进底层,一旦底层有数据,就会调用回调函数;
有了回调函数就不用OS进行频繁的遍历来查找事件是否就绪了; - 调用epoll_create:构建红黑树,建立底层回调函数,构建就绪队列;
调用epoll_ctl:向特定epoll模型中增加、修改或删除特定fd上的特定事件(修改红黑树);
调用epoll_wait:如果就绪队列不为空,则把发生的事件复制到用户态,同时将事件的数量返回给用户,这个操作的时间复杂度是O(1); - 对于epoll_create的返回值:完成epoll模型中所有任务的一定是一个进程;
指向epoll模型对应的数据结构的指针保存在一个文件中,epoll_create在创建完epoll模型后就会返回该文件描述符;
后面进程在调用epoll模型时,就能够通过该fd找到对应的数据结构;
细节
- 红黑树的时候,是要有key值的,使用文件描述符作为key值;
- 用户只需要设置关系,获取结果即可,不用关心任何对fd与event的管理细节;
- 底层只要有fd就绪了,OS自己会给我构建节点,连入到就绪队列中;
上层只需要不断的从就绪队列中将数据拿走,就完成了获取就绪事件的任务;
这也是一个生产者消费者模型;对于共享资源 – epoll已经保证所有的epoll接口都是线程安全的; - 如果底层没有就绪事件呢? 我们的上层应该怎么办?阻塞等待;
- 在epoll_wait的时候,如果底层就绪的sock非常多,revs承装不下,怎么办?一次拿不完,就下一次再拿;
- 关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有就绪的event按照顺序放入到revs数组中,一共有返回值个;
- epoll为什么高效?
a. 用户不用管理文件描述符;
b. OS不用浪费精力在文件描述符的事件监测上;
c. 事件就绪后直接放到就绪队列中,用户直接从就绪队列中取走事件就可以了;
3.epoll服务器实现
Sock.hpp
同select
Log.hpp
同select
epoll.hpp
对epoll接口的封装;
#pragma once #include <iostream> #include <sys/epoll.h> #include <unistd.h> class Epoll {
public: static const int gsize = 256; public: static int CreateEpoll() {
int epfd = epoll_create(gsize); if(epfd > 0) {
return epfd; } exit(5); } static bool CtlEpoll(int epfd, int oper, int sock, uint32_t events) {
struct epoll_event ev; ev.events = events; ev.data.fd = sock; // 存储sock int n = epoll_ctl(epfd, oper, sock, &ev); return n == 0; } static int WaitEpoll(int epfd, struct epoll_event revs[], int num, int timeout) {
// 细节1:如果底层就绪的sock非常多,revs承装不下,怎么办??不影响!一次拿不完,就下一次再拿 // 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有 // 就绪的event按照顺序放入到revs数组中!一共有返回值个! return epoll_wait(epfd, revs, num, timeout); } };
epollServer.hpp
epoll服务器
- 成员变量包括一个epoll_event数组的地址,该数组的大小,以及一个回调函数指针;
- 构造时就需要开好数组的空间,创建epoll模型,并在创建好listen套接字后,就将listensock加入epoll模型中进行监测;
- 在有事件就绪后,需要判断是listensock的事件就绪,还是普通sock的事件就绪,两者需要调用不同的方法;
- 回调函数的作用是在接收到数据后,可以由用户自定义数据的处理方式,在写好处理函数后,传入对象内部即可;
#ifndef __EPOLL_SERVER_HPP__ #define __EPOLL_SERVER_HPP__ #include <iostream> #include <string> #include <functional> #include <cassert> #include "Log.hpp" #include "Sock.hpp" #include "epoll.hpp" namespace ns_epoll {
const static int default_port = 8080; const static int gnum = 64; class EpollServer {
using func_t = std::function<void(std::string)>; // 处理数据时时候回调函数处理 public: EpollServer(func_t HandlerRequest, const int& port = default_port) : _port(port) , _revs_num(gnum) , _HandlerRequest(HandlerRequest) {
// 1.申请对应的epoll_event数组的空间 _revs = new struct epoll_event[_revs_num]; // 2.创建listensock _listensock = Sock::Socket(); Sock::Bind(_listensock, _port); Sock::Listen(_listensock); // 3.创建epoll模型 _epfd = Epoll::CreateEpoll(); logMessage(DEBUG, "init success, listensock: %d, epfd: %d", _listensock, _epfd); // 4.将listensock添加到epoll模型中,让它帮忙管理 if(!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listensock, EPOLLIN)) {
exit(6); } logMessage(DEBUG, "add listensock to epoll success."); } void Accepter() {
std::string clientip; uint16_t clientport; int sock = Sock::Accept(_listensock, &clientip, &clientport); if(sock < 0) {
logMessage(WARNING, "accept error!"); return; } //不能直接读取,因为不知道底层数据是否就绪 //因此需要将sock再加入epoll模型进行监测 if(!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN)) {
return; } logMessage(DEBUG, "add new sock : %d to epoll success", sock); } void Recver(int sock) {
//1.读取数据 char buffer[10240]; ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); if(n > 0) {
//假设读取到了一个完整的报文 buffer[n] = 0; _HandlerRequest(buffer); // 调用回调函数来处理数据,可以有不同的回调函数传进来 } else if(n == 0) // 读取完成 {
// 1. 先在epoll中去掉对sock的关心,因为epoll操作的需要是合法文件描述符,否则会报错 bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0); assert(res); (void)res; //2.再close文件 close(sock); logMessage(NORMAL, "client %d quit, me too...", sock); } else // 读取异常 {
// 1. 先在epoll中去掉对sock的关心,因为epoll操作的需要是合法文件描述符,否则会报错 bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0); assert(res); (void)res; //2.再close文件 close(sock); logMessage(NORMAL, "client recv %d error, close error sock", sock); } } void HandlerEvents(int n) // 只需遍历到已经就绪的fd就好 {
assert(n > 0); for(int i = 0; i < n; i++) {
uint32_t revents = _revs[i].events; int sock = _revs[i].data.fd; //读事件就绪 if(revents & EPOLLIN) {
// 如果是listensock就绪 if(sock == _listensock) {
Accepter(); } else {
Recver(sock); } } } } void LoopOnce(int timeout) //一次循环,epoll的一次等待 {
int n = Epoll::WaitEpoll(_epfd, _revs, _revs_num, timeout); switch(n) {
case 0: logMessage(DEBUG, "timeout..."); break; case -1: logMessage(WARNING, "epoll wait error: %s", strerror(errno)); break; default: //等待成功 logMessage(DEBUG, "get a event"); HandlerEvents(n); // 将已经就绪的事件的数量传入HandlerEvents break; } } void Start() {
int timeout = -1; while(true) {
LoopOnce(timeout); } } ~EpollServer() {
if(_listensock > 0) {
close(_listensock); } if(_epfd > 0) {
close(_epfd); } if(_revs) {
delete[] _revs; } } private: int _listensock; int _epfd; uint16_t _port; struct epoll_event* _revs; // 已经就绪的fd int _revs_num; // 已经就绪的fd的数量 func_t _HandlerRequest; // 处理数据的回调函数 }; } #endif
main.cc
#include "epollServer.hpp" #include<memory> using namespace std; using namespace ns_epoll; void change(std::string request) {
//完成业务逻辑 cout << "change : " << request << endl; } int main() {
// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的! // std::cout << sizeof(fd_set) * 8 << std::endl; std::unique_ptr<EpollServer> svr(new EpollServer(change)); svr->Start(); return 0; }
运行结果:
4.epoll的优点
- 接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开;
- 数据拷贝轻量:只在合适的时候调用EPOLL CTL ADD将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/polI都是每次循环都要进行拷贝);
- 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度0(1);即使文件描述符数目很多,效率也不会受到影响;
- 没有数量限制:文件描述符数目无上限;
5.epoll的工作模式
epoll有两种工作模式:
- 水平触发(LT):如果epoll服务器里面有该文件描述符的数据,就会一直通知该fd(select,poll,epoll的默认模式);
- 当epoll检测到socket上事件就绪的时候可以不立刻进行处理,或者只处理一部分;
- 例如,由于只读了1K数据,缓冲区中还剩1 K数据,在第二次调用epoll wait时,epoll wait仍然会立刻返回并通知socket读事件就绪;
- 直到缓冲区上所有的数据都被处理完,epoll_wait才不会立刻返回;
- 支持阻塞读写和非阻塞读写;
- 边缘触发(ET):如果epoll服务器是首次有该文件描述符的数据,或者是数据变多(变化)的时候,服务器才会通知该fd;
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志,epolli进入ET工作模式;- 当epoll检测到socket上事件就绪时,必须立刻处理;
- 例如,虽然只读了1 K的数据,缓冲区还剩1 K的数据,在第二次调用epoll wait的时候,epoll wait 不会再返回了;
- 也就是说ET模式下文件描述符上的事件就绪后,只有一次处理机会;
- ET的性能比LT性能更高(epoll wait 返回的次数少了很多),Nginx默认采用ET模式使用epoll;
- 只支持非阻塞的读写;
ET模式更加高效:
- 更少的epoll_wait返回次数;
- ET模式会倒逼程序员尽快将接收缓冲区中的数据全部取走,应用层尽快的取走了缓冲区中的数据,那么在单位时间下,该模式下工作的服务器,就可以在一定程度上,给发送方发送一个更大的接收窗口,所以对方就可以有更大的滑动窗口,一次发送更多的数据,提高IO吞吐;
6.Reactor服务器
Reactor服务器是epoll工作在ET模式下的服务器;
Sock.hpp
- 增加对sock设置非阻塞的接口;
- 在Accept接口增加输出参数,输出accept的错误码;
#pragma once #include <iostream> #include <string> #include <cstring> #include <cerrno> #include <cassert> #include <unistd.h> #include <memory> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <ctype.h> #include <fcntl.h> #include "Log.hpp" class Sock {
private: const static int gbacklog = 20; public: Sock() {
} static int Socket() {
int listensock = socket(AF_INET, SOCK_STREAM, 0); if (listensock < 0) {
logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno)); exit(2); } logMessage(NORMAL, "create socket success, listensock: %d", listensock); return listensock; } static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0") {
struct sockaddr_in local; memset(&local, 0, sizeof local); local.sin_family = AF_INET; local.sin_port = htons(port); inet_pton(AF_INET, ip.c_str(), &local.sin_addr); if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0) {
logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno)); exit(3); } } static void Listen(int sock) {
if (listen(sock, gbacklog) < 0) {
logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno)); exit(4); } logMessage(NORMAL, "init server success"); } // 一般经验 // const std::string &: 输入型参数 // std::string *: 输出型参数 // std::string &: 输入输出型参数 // 输出accept的错误码,用于判断accept的状态 static int Accept(int listensock, std::string *ip, uint16_t *port, int* accept_errno) {
struct sockaddr_in src; socklen_t len = sizeof(src); *accept_errno = 0; int servicesock = accept(listensock, (struct sockaddr *)&src, &len); if (servicesock < 0) {
logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno)); *accept_errno = errno; return -1; } if(port) *port = ntohs(src.sin_port); if(ip) *ip = inet_ntoa(src.sin_addr); return servicesock; } static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port) {
struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(server_port); server.sin_addr.s_addr = inet_addr(server_ip.c_str()); if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true; else return false; } static bool SetNonBlock(int sock) // 设置sock为非阻塞 {
int fl = fcntl(sock, F_GETFL); if(fl < 0) return false; fcntl(sock, F_SETFL, fl | O_NONBLOCK); return true; } ~Sock() {
} };
Log.hpp
同select;
Epoll.hpp
- 封装epoll的三个接口;
#pragma once #include <iostream> #include <sys/epoll.h> class Epoll {
const static int gnum = 128; const static int gtimeout = 5000; public: Epoll(int timeout = gtimeout) : _timeout(timeout) {
} void CreateEpoll() {
_epfd = epoll_create(gnum); if(_epfd < 0) {
exit(5); } } bool AddSockToEpoll(int sock, uint32_t events) {
struct epoll_event ev; ev.events = events; ev.data.fd = sock; int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev); return n == 0; } int WaitEpoll(struct epoll_event revs[], int num) {
return epoll_wait(_epfd, revs, num, _timeout); } ~Epoll() {
} private: int _epfd; int _timeout; };
Protocol.hpp
- 报文的序列化和反序列化;
#pragma once #include <iostream> #include <cstring> #include <string> #include <vector> // 1. 报文和报文之间,我们采用特殊字符来进行解决粘报问题 // 2. 获取一个一个独立完整的报文,序列和反序列化 -- 自定义 // 100+19X100+19X100+19 // 支持解决粘报问题,处理独立报文 #define SEP "X" #define SEP_LEN strlen(SEP) // 自己手写序列反序列化 #define SPACE " " #define SPACE_LEN strlen(SPACE) // 我们要把传入进来的缓冲区进行切分 // 1. buffer被切走的,也同时要从buffer中移除 // 2. 可能会存在多个报文,多个报文依次放入out // buffer: 输入输出型参数 // out: 输出型参数 void SpliteMessage(std::string &buffer, std::vector<std::string> *out) {
// 100+ // 100+19X1 // 100+19X100+19 while (true) {
auto pos = buffer.find(SEP); if (std::string::npos == pos) break; std::string message = buffer.substr(0, pos); buffer.erase(0, pos + SEP_LEN); out->push_back(message); // std::cout << "debug: " << message << " : " << buffer << std::endl; // sleep(1); } } // TODO // std::string Encode(std::string &s) {
return s + SEP; } class Request {
public: std::string Serialize() {
std::string str; str = std::to_string(x_); str += SPACE; str += op_; // TODO str += SPACE; str += std::to_string(y_); return str; } bool Deserialized(const std::string &str) // 1 + 1 {
std::size_t left = str.find(SPACE); if (left == std::string::npos) return false; std::size_t right = str.rfind(SPACE); if (right == std::string::npos) return false; x_ = atoi(str.substr(0, left).c_str()); y_ = atoi(str.substr(right + SPACE_LEN).c_str()); if (left + SPACE_LEN > str.size()) return false; else op_ = str[left + SPACE_LEN]; return true; } public: Request() {
} Request(int x, int y, char op) : x_(x), y_(y), op_(op) {
} ~Request() {
} public: int x_; // 是什么? int y_; // 是什么? char op_; // '+' '-' '*' '/' '%' }; class Response {
public: // "code_ result_" std::string Serialize() {
std::string s; s = std::to_string(code_); s += SPACE; s += std::to_string(result_); return s; } // "111 100" bool Deserialized(const std::string &s) {
std::size_t pos = s.find(SPACE); if (pos == std::string::npos) return false; code_ = atoi(s.substr(0, pos).c_str()); result_ = atoi(s.substr(pos + SPACE_LEN).c_str()); return true; } public: Response() {
} Response(int result, int code) : result_(result), code_(code) {
} ~Response() {
} public: // 约定! // result_? code_? code_ 0? 1?2?3? int result_; // 计算结果 int code_; // 计算结果的状态码 };
TcpServer.hpp
- 为了保证未来正确的读取,每一个sock都要有属于自己的缓冲区;
- 设置一个Connection类,将每个sock及其读写回调函数和缓冲区都封装起来,作为一个连接对象,TcpServer中会维护大量的Connection连接;
- bind:绑定函数参数,返回的是一个函数对象,placeholders::_1是一个占位符,使用时需传递一个参数;
由于类内成员函数的第一个参数默认为this指针,因此类内成员函数在作为回调函数时,需要先将第一个参数绑定为this指针; - 将所有Connection管理起来,通过设置一个哈希表将sock和Connection建立起映射,通过sock能够查找到Connection,进而能够调用相应方法;
- 通过回调方法的不同,来区分listensock和普通sock;
- 由于是ET模式,因此每次读取的时候,都需要将已经就绪的数据全部读取,sock是非阻塞模式,在底层没有数据的时候,就会报错结束读取,而不会阻塞;
- recv每次接收到的数据都存到连接自己的接收缓冲区;
- 将TCP服务器与上层业务解耦,在服务器类中有一个上层业务处理的回调函数接口_cb,在每次调用业务分派的时候指定数据处理方法,当每次读取到完整报文后,就会调用函数进行处理;
- 在Recver接口中,数据保存到连接的接收缓冲区后,会进行请求分割,将字符串分割成一个个的完整请求,如果有不完整的请求,就先保留在缓冲区中,等待剩下的完整数据到来,完整的请求保存在vector中,在调用上层业务逻辑进行处理;
- 第一次发送之前,epoll服务器并没有关心此sock的发送时间,因此在数据准备好,发送之前,需要在业务层触发该sock的写事件;
- Sender发送的时候也是一样的逻辑,不能保证全部发送完成,但是可以保证,如果没有出错,一定是要么发完,要么发送条件不满足,下次发送;
#pragma once #pragma once #include <iostream> #include <functional> #include <string> #include <vector> #include <cerrno> #include <cassert> #include <unordered_map> #include "Sock.hpp" #include "Log.hpp" #include "Epoll.hpp" #include "Protocol.hpp" class TcpServer; class Connection; using func_t = std::function<void(Connection *)>; using callback_t = std::function<void(Connection *, std::string &)>; // 我们为了能够正常工作,常规的sock必须是要有自己独立的接收缓冲区&&发送缓冲区 class Connection {
public: Connection(int sock = -1) : _sock(sock), _tsvr(nullptr) {
} void SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb) {
_recv_cb = recv_cb; _send_cb = send_cb; _except_cb = except_cb; } ~Connection() {
} public: // 负责进行IO的文件描述符 int _sock; // 三个回调方法,表征的就是对sock进行特定读写对应的方法 func_t _recv_cb; func_t _send_cb; func_t _except_cb; // 接收缓冲区和发送缓冲区 std::string _inbuffer; std::string _outbuffer; // 设置对TcpServer的回指指针 TcpServer *_tsvr; }; // TcpServer中会维护大量的Connection连接 class TcpServer {
const static int gport = 8080; const static int gnum = 128; public: TcpServer(int port = gport) : _port(port), _revs_num(gnum) {
// 1.创建listensock _listensock = Sock::Socket(); Sock::Bind(_listensock, _port); Sock::Listen(_listensock); // 2.创建多路转接对象 _epoll.CreateEpoll(); // 3. 添加listensock到epoll服务器中 // 设置listensock的接收回调函数 // bind:绑定函数参数,返回的是一个函数对象,placeholders::_1是一个占位符,使用时需传递一个参数 // 由于Accepter是类内成员,因此第一个参数是隐藏的this指针 AddConnection(_listensock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); // 4.构建一个获取就绪事件的缓冲区 _revs = new struct epoll_event[_revs_num]; } // 将sock添加到epoll服务器中 void AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb) {
Sock::SetNonBlock(sock); // 除了listensock,未来服务器中会存在大量的socket,每一个sock都必须被封装为一个Connection // 当服务器中存在大量Connection的时候,TcpServer就需要将所有的Connection进行管理:先描述,再组织 // 1.构建Connection对象,封装sock Connection *conn = new Connection(sock); // 设置回调函数 conn->SetCallBack(recv_cb, send_cb, except_cb); conn->_tsvr = this; // 2.添加sock到epoll中 _epoll.AddSockToEpoll(sock, EPOLLIN | EPOLLET); // epoll默认为LT模式,需设置为ET模式 // 3.还要将对应的Connection*对象指针添加到Connections的映射表中 _connections.insert(std::make_pair(sock, conn)); } void Accepter(Connection *conn) // 所有的连接都是封装到Connection对象中的,因此所有回调函数的参数都是Connection* {
// logMessage(DEBUG, "Accepter been called"); // 一定是listensock就绪了,此次读取是不会阻塞的 while (true) // 需要一次读完所有的数据,底层不一定只有一个连接就绪 {
std::string clientip; uint16_t clientport; int accept_errno = 0; // 获取accept的错误码 // sock一定是常规的IO cok int sock = Sock::Accept(conn->_sock, &clientip, &clientport, &accept_errno); if (sock < 0) {
if (accept_errno == EAGAIN || accept_errno == EWOULDBLOCK) // 底层没链接了 {
break; } else if (accept_errno == EINTR) // accept被中断了 {
continue; } else {
// accept失败 logMessage(WARNING, "accept error, %d : %s", accept_errno, strerror(accept_errno)); break; } } // 将sock托管给TcpServer if (sock >= 0) {
AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1), std::bind(&TcpServer::Excepter, this, std::placeholders::_1)); logMessage(DEBUG, "accept client %s:%d success, add to epoll&&TcpServer success, sock: %d", clientip.c_str(), clientport, sock); } } } // 使能读写 void EnableReadWrite(Connection *conn, bool readable, bool writeable) {
uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0)); bool res = _epoll.CtlEpoll(conn->_sock, events); assert(res); } void Recver(Connection *conn) {
const int num = 1024; bool err = false; while (true) {
char buffer[num]; ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0); if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break; // 正常的 else if (errno == EINTR) continue; // 被中断 else {
logMessage(ERROR, "recv error, %d : %s", errno, strerror(errno)); conn->_except_cb(conn); err = true; // 出错 break; } } else if (n == 0) // 对端关闭 {
logMessage(DEBUG, "client[%d] quit, server close [%d]", conn->_sock, conn->_sock); conn->_except_cb(conn); err = true; break; } else {
// 读取成功 buffer[n] = 0; conn->_inbuffer += buffer; // 每次接收到的数据都存到自己的接收缓冲区 } } logMessage(DEBUG, "conn->_inbuffer[sock: %d]: %s", conn->_sock, conn->_inbuffer.c_str()); if (!err) // 没有出错 {
std::vector<std::string> messages; SpliteMessage(conn->_inbuffer, &messages); // 分割成一个个的完整请求,如果有不完整求情,就留在缓冲区等待下次读取 // 能保证走到这里,就是一个完整的报文 for (auto &msg : messages) {
_cb(conn, msg); } } } // 最开始的时候,我们的连接中的写事件是没有被触发的,此时epoll服务器不关心此链接的写事件 // 因此需要在业务逻辑中触发写事件 void Sender(Connection *conn) {
while (true) {
ssize_t n = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0); if (n > 0) {
conn->_outbuffer.erase(0, n); if (conn->_outbuffer.empty()) break; } else {
if (errno == EAGAIN || errno == EWOULDBLOCK) break; else if (errno == EINTR) continue; else {
logMessage(ERROR, "send error, %d : %s", errno, strerror(errno)); conn->_except_cb(conn); break; } } } // 不能保证全部发送完成,但是可以保证,如果没有出错,一定是要么发完,要么发送条件不满足,下次发送 if (conn->_outbuffer.empty()) {
EnableReadWrite(conn, true, false); // 发完了,就关闭epoll对此sock的写关心 } else {
EnableReadWrite(conn, true, true); // 没发完,就继续发 } } void Excepter(Connection *conn) {
if (!IsConnectionExists(conn->_sock)) return; // 1. 从epoll中移除 bool res = _epoll.DelFromEpoll(conn->_sock); assert(res); // 要判断 // 2. 从我们的unorder_map中移除 _connections.erase(conn->_sock); // 3. close(sock); close(conn->_sock); // 4. delete conn; delete conn; logMessage(DEBUG, "Excepter 回收完毕,所有的异常情况"); } void LoopOnce() {
int n = _epoll.WaitEpoll(_revs, _revs_num); // 获取已就绪事件 for (int i = 0; i < n; i++) {
int sock = _revs[i].data.fd; uint32_t revents = _revs[i].events; // 将所有的异常,全部交给read或者write来统一处理! if (revents & EPOLLERR) revents |= (EPOLLIN | EPOLLOUT); if (revents & EPOLLHUP) revents |= (EPOLLIN | EPOLLOUT); if (revents & EPOLLIN) // 接收事件 {
// 如果该连接存在且连接的接收回调方法存在,就是接收事件成功触发 if (IsConnectionExists(sock) && _connections[sock]->_recv_cb != nullptr) {
// listensock和普通sock在构建Connection对象时传入的回调函数不同,以此来区分 _connections[sock]->_recv_cb(_connections[sock]); } } if (revents & EPOLLOUT) {
if (IsConnectionExists(sock) && _connections[sock]->_send_cb != nullptr) {
_connections[sock]->_send_cb(_connections[sock]); } } } } // 根据就绪的事件,进行特定事件的派发 void Dispather(callback_t cb) {
_cb = cb; while (true) {
LoopOnce(); } } bool IsConnectionExists(int sock) {
auto iter = _connections.find(sock); if (iter == _connections.end()) return false; else return true; } ~TcpServer() {
if (_listensock >= 0) close(_listensock); if (_revs) delete[] _revs; } private: int _listensock; int _port; Epoll _epoll; // sock : connection 产生映射 std::unordered_map<int, Connection *> _connections; // 管理connection struct epoll_event *_revs; // 保存就绪事件的数组 int _revs_num; // 上层业务处理 // 将TCP服务与上层服务解耦 callback_t _cb; };
main.cc
- 网络计算器的业务逻辑
#include "TcpServer.hpp" #include<memory> static Response calculator(const Request &req) {
Response resp(0, 0); switch (req.op_) {
case '+': resp.result_ = req.x_ + req.y_; break; case '-': resp.result_ = req.x_ - req.y_; break; case '*': resp.result_ = req.x_ * req.y_; break; case '/': if (0 == req.y_) resp.code_ = 1; else resp.result_ = req.x_ / req.y_; break; case '%': if (0 == req.y_) resp.code_ = 2; else resp.result_ = req.x_ % req.y_; break; default: resp.code_ = 3; break; } return resp; } void NetCal(Connection* conn, std::string& request) {
logMessage(DEBUG, "NetCal been called, get request: %s", request.c_str()); // 1.反序列化 Request req; if(!req.Deserialized(request)) return; // 2.业务处理 Response resp = calculator(req); // 3.序列化,构建应答 std::string sendstr = resp.Serialize(); // 4.交给服务器conn conn->_outbuffer += sendstr; // 5.让底层的TcpServer开始发送 // a.需要有完整的发送逻辑 // b.我们触发发送的动作,一旦我们开启EPOLLOUT,epoll会自动立马触发一次发送事件就绪, // 如果后续保持发送的开启,epoll会一直发送 conn->_tsvr->EnableReadWrite(conn, true, true); } int main() {
std::unique_ptr<TcpServer> svr(new TcpServer()); svr->Dispather(NetCal); return 0; }
运行结果:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/103536.html