编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

从内核接收数据到EPOLL原理(epoll 内核)

wxchong 2024-07-29 08:11:20 开源技术 34 ℃ 0 评论


一、内核接收数据流程

内核接收数据.png

1.网卡发现 MAC 地址符合,就将包收进来;发现 IP 地址符合,根据 IP 图中协议项,知道上一层是 TCP 协议;

2.DMA把TCP数据包copy到内核缓冲区;

3.触发CPU中断,中断程序摘除TCP头通过socket五要素(源IP/PORT、目的IP/PORT、协议)找到对应的socket文件,并把原始二进制数据报copy到socket接收缓冲区;

4.中断程序唤醒被阻塞的内核线程;

5.内核线程切换到用户线程把数据从socket接口缓冲区copy到应用内存;

二、中断处理流程

中断处理.png

I/O发出的信号的异常代码,拿到异常代码之后,CPU就会触发异常处理的流程。计算机在内存里会保存中断向量表,用来存放不同的异常代码对应的异常处理程序所在的地址。

CPU在拿到了异常码之后,会先把当前的程序执行的现场,保存到程序栈里面,然后根据异常码查询,找到对应的异常处理程序,最后把后续指令执行的指挥权,交给这个异常处理程序。

异常处理程序结束之后返回到原来指令执行的位置继续执行;

三、阻塞不占用 cpu

网卡何时接收到数据是依赖发送方和传输路径的,这个延迟通常都很高,是毫秒(ms)级别的。而应用程序处理数据是纳秒(ns)级别的。也就是说整个过程中,内核态等待数据,处理协议栈是个相对很慢的过程。这么长的时间里,用户态的进程是无事可做的,因此用到了“阻塞(挂起)”。

阻塞是进程调度的关键一环,指的是进程在等待某事件发生之前的等待状态。请看下表,在 Linux 中,进程状态大致有 7 中(在 include/linux/sched.h 中有更多状态):

从说明中可以发现,“可运行状态”会占用 CPU 资源,另外创建和销毁进程也需要占用 CPU 资源(内核)。重点是,当进程被"阻塞/挂起"时,是不会占用 CPU 资源的。

为了支持多任务,Linux 实现了进程调度的功能(CPU 时间片的调度)。而这个时间片的切换,只会在“可运行状态”的进程间进行。因此“阻塞/挂起”的进程是不占用 CPU 资源的。

四、工作队列和等待队列

工作队列和等待队列.png

工作队列:为了方便时间片的调度,所有“可运行状态”状态的进程组成的队列;

fd文件列表:内核打开的文件句柄,Linux一切皆文件,用户线程执行创建Socket时内核就会创建一个由文件系统管理的sock对象;

sock:socket内核中的数据结构,主要包含发送缓冲区、接收缓冲区、等待队列;

structsock{__u32 daddr;/* 外部IP地址 */__u32 rcv_saddr;/* 绑定的本地IP地址 */__u16 dport;/* 目标端口 */__u16 sport;/* 源端口 */unsignedshortfamily;/* 地址簇 */intrcvbuf;/* 接受缓冲区长度(单位:字节) */structsk_buff_headreceive_queue;/* 接收包队列 */intsndbuf;/* 发送缓冲区长度(单位:字节) */structsk_buff_headwrite_queue;/* 包发送队列 */wait_queue_head_t*sleep;/* 等待队列,通常指向socket的wait域 */......}

等待队列:等待当前socket的线程;

工作队列中线程执行到阻塞操作等待socket时,会从工作队列中移除,移动到该socket的等待队列中;当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。

五、BIO

publicstaticvoidmain(String[]args)throws IOException{ServerSocketserverSocket=newServerSocket(9000);while(true){// 没有连接-阻塞Socketsocket=serverSocket.accept();byte[]bytes=newbyte[1024];InputStreaminputStream=socket.getInputStream();while(true){// 没有数据-阻塞intread=inputStream.read(bytes);if(read!=-1){System.out.println(newString(bytes,0,read));}else{break;}}socket.close();}}

BIO模式存在两个阻塞点,一个是accept阻塞等待客户端连接,一个是阻塞等待socket请求数据;简单跟以下源码就会发现

new ServerSocket(9000)最终通过

int newfd = socket0(stream, false /*v6 Only*/);调用Linux int socket(int domain, int type, int protocol);创建服务端socket;

bind0(nativefd, address, port, exclusiveBind);调用Linux int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);绑定端口;

listen0(nativefd, backlog);调用Linux int listen(int sockfd, int backlog);设置监听;

接下来serverSocket.accept()最终通过

newfd = accept0(nativefd, isaa);调用Linux int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);阻塞等待客户端连接,会创建一个socket用于跟该客户端进行通信,返回socket文件描述符fd;

最后inputStream.read(bytes);调用Linux ssize_t recv(int sockfd, void *buf, size_t len, int flags);阻塞从socket读缓冲区读取客户端请求数据;

可以通过 man 命令查看Linux 系统调用方法具体描述;

通过传统BIO的操作方式可以看出一个请求必须要创建一个内核线程进行处理,recv只能监视单个socket,当并发比较高时就会消耗大量系统资源,也就是所谓的C10K问题;那么如何解决这个问题呢?后面出现的多路复用 select / poll / epoll思路都是使用一个线程来处理若干个连接(监视若干个socket),类似餐厅服务员的角色。

六、select

select 方案是用一个 fd_set 结构体来告诉内核同时监控多个socket,当其中有socket的状态发生变化或超时,则调用返回。之后应用使用 FD_ISSET 来逐个查看是哪个socket的状态发生了变化。

6.1 select原理

以下面select伪代码为例:

ints=socket(AF_INET,SOCK_STREAM,0);bind(s,...)listen(s,...)//接受客户端连接intc=accept(s,...)intreadSet[]=存放需要监听的socket文件描述符while(1){intn=select(...,readSet,...)for(inti=0;i<readSet.count;i++){if(FD_ISSET(readSet[i],...)){//fds[i]的数据处理}}}

select系统调用函数# maxfd:表示的是待测试的描述符基数,它的值是待测试的最大描述符加 1# readset:读描述符集合# writeset:写描述符集合# exceptset:错误描述符集合# timeout:超时时间intselect(intmaxfd,fd_set*readset,fd_set*writeset,fd_set*exceptset,conststructtimeval*timeout);

上面示例代码中,先准备一个数组 readSet 存放着所有需要监视读事件的socket。然后调用select,如果 readSet 中的所有socket都没有数据,select会阻塞,直到有一个socket接收到数据,select返回,唤醒线程。用户可以遍历 readSet,通过FD_ISSET (对这个数组进行检测,判断出对应socket的元素 readSet[fd]是 0 还是 1)判断具体哪个 socket 收到数据,然后做出处理。

6.2 select流程

select-1.png

线程 A 调用 select readSet 数组元素为sock1、sock2、sock3 ,此时3个socket 都没有数据可读,就把线程A 从工作队列中移除,并分别添加到3个sock 的等待队列中;

select-2.png

当3个sock中任意一个有数据可读时,中断程序都会把线程A 从所有sock等待队列中移除并重新加入工作队列,等待cpu时间片继续执行(即从select中返回)。但是此时线程A 并不知道哪个socket有数据,于是还要遍历readSet使用 FD_ISSET 来逐个查看是哪个socket的有数据可读。

6.3 select的缺点

1、每次调用select都需要将线程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fd_set列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。

2、由于只有一个字段记录关注和发生事件,每次调用之前要重新初始化 fd_set 结构体。

3、线程被唤醒后,程序并不知道哪些socket收到数据,还需要遍历一次。

七、poll

总结一下 select 的缺点就是句柄上限+重复初始化+逐个排查所有文件句柄状态效率不高。poll 主要解决 select 的前两个问题:通过改变跟内核交互的数据结构突破了文件描述符的限制,同时使用不同字段分别标注关注事件和发生事件,来避免重复初始化。

intpoll(structpollfd*fds,unsignedlongnfds,inttimeout);# fd:描述符 fd# events:描述符上待检测的事件类型events,注意这里的 events 可以表示多个不同的事件,# 具体的实现可以通过使用二进制掩码位操作来完成,例如,POLLIN 和 POLLOUT 可以表示读和写事件# revents:structpollfd{intfd;/* file descriptor */shortevents;/* events to look for */shortrevents;/* events returned */};

和 select 非常不同的地方在于:

poll 每次检测之后的结果不会修改原来的传入值,而是将结果保留在 revents 字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。

在 select 里面,文件描述符的个数已经随着 fd_set 的实现而固定,没有办法对此进行配置;而在 poll 函数里,可以控制 pollfd 结构的数组大小,这意味着可以突破原来 select 函数最大描述符的限制,在这种情况下,应用程序调用者需要分配 pollfd 数组并通知 poll 函数该数组的大小。

八、epoll

epoll 使用 eventpoll 作为中间层,线程不用加入在被监视的每一个 socket 阻塞队列中,也不用再遍历 socket 列表查看哪些有事件发生。

epoll提供了三个函数:

# 创建了一个 epoll 实例 epolleventintepoll_create(intsize);# 往这个 epoll 实例增加或删除监控的事件# epfd:epoll_create 创建的 epoll 实例句柄# op:增加还是删除一个监控事件# fd:注册的事件的文件描述符# event:注册的事件类型,并且可以在这个结构体里设置用户需要的数据intepoll_ctl(intepfd,intop,intfd,structepoll_event*event);# 类似之前的 poll 和 select 函数,调用者进程被挂起,在等待内核 I/O 事件的分发# epfd:实例描述字,也就是 epoll 句柄# events:用户空间需要处理的 I/O 事件,这是一个数组,数组的大小由 epoll_wait 的返回值决定,这个数组的每个元素都是一个需要待处理的 I/O 事件,其中 events 表示具体的事件类型,事件类型取值和 epoll_ctl 可设置的值一样,这个 epoll_event 结构体里的 data 值就是在 epoll_ctl 那里设置的 data,也就是用户空间和内核空间调用时需要的数据# maxevents:大于 0 的整数,表示 epoll_wait 可以返回的最大事件值# timeout:阻塞调用的超时值,如果这个值设置为 -1,表示不超时;如果设置为 0 则立即返回,即使没有任何 I/O 事件发生intepoll_wait(intepfd,structepoll_event*events,intmaxevents,inttimeout);

8.1 代码示例

publicclassServerConnect{privatestaticfinalintBUF_SIZE=1024;privatestaticfinalintPORT=8080;privatestaticfinalintTIMEOUT=3000;publicstaticvoidmain(String[]args){selector();}publicstaticvoidselector(){Selectorselector=null;ServerSocketChannelssc=null;try{// 打开一个Channelssc=ServerSocketChannel.open();// 将Channel绑定端口ssc.socket().bind(newInetSocketAddress(PORT));// 设置Channel为非阻塞,如果设置为阻塞,其实和BIO差不多了。ssc.configureBlocking(false);// 打开一个Slectoreselector=Selector.open();// 向selector中注册Channel和感兴趣的事件ssc.register(selector,SelectionKey.OP_ACCEPT);while(true){// selector监听事件,select会被阻塞,直到selector监听的channel中有事件发生或者超时,会返回一个事件数量//TIMEOUT就是超时时间,selector初始化的时候会添加一个用于主动唤醒的pipe,待会源码分析会说if(selector.select(TIMEOUT)==0){System.out.println("==");continue;}/**

* SelectionKey的组成是selector和Channel

* 有事件发生的channel会被包装成selectionKey添加到selector的publicSelectedKeys属性中

* publicSelectedKeys是SelectionKey的Set集合

*下面这一部分遍历,就是遍历有事件的channel

*/Iterator<SelectionKey>iter=selector.selectedKeys().iterator();while(iter.hasNext()){SelectionKeykey=iter.next();if(key.isAcceptable()){handleAccept(key);}if(key.isReadable()){handleRead(key);}if(key.isWritable()&&key.isValid()){handleWrite(key);}if(key.isConnectable()){System.out.println("isConnectable = true");}//每次使用完,必须将该SelectionKey移除,否则会一直存储在publicSelectedKeys中//下一次遍历又会重复处理iter.remove();}}}catch(IOExceptione){e.printStackTrace();}finally{try{if(selector!=null){selector.close();}if(ssc!=null){ssc.close();}}catch(IOExceptione){e.printStackTrace();}}}publicstaticvoidhandleAccept(SelectionKeykey)throws IOException{ServerSocketChannelssChannel=(ServerSocketChannel)key.channel();SocketChannelsc=ssChannel.accept();sc.configureBlocking(false);sc.register(key.selector(),SelectionKey.OP_READ,ByteBuffer.allocateDirect(BUF_SIZE));}publicstaticvoidhandleRead(SelectionKeykey)throws IOException{SocketChannelsc=(SocketChannel)key.channel();ByteBufferbuf=(ByteBuffer)key.attachment();longbytesRead=sc.read(buf);while(bytesRead>0){buf.flip();while(buf.hasRemaining()){System.out.print((char)buf.get());}System.out.println();buf.clear();bytesRead=sc.read(buf);}if(bytesRead==-1){sc.close();}}publicstaticvoidhandleWrite(SelectionKeykey)throws IOException{ByteBufferbuf=(ByteBuffer)key.attachment();buf.flip();SocketChannelsc=(SocketChannel)key.channel();while(buf.hasRemaining()){sc.write(buf);}buf.compact();}}

调用到内核的流程如下:

1、ssc = ServerSocketChannel.open() -> EPollSelectorProvider -> int socket(int domain, int type, int protocol);

2、ssc.socket().bind(newInetSocketAddress(PORT)); -> int bind(int sockfd, const struct sockaddr addr, socklen_t addrlen);

3、getImpl().listen(backlog); -> int listen(int sockfd, int backlog);

4、ssc.configureBlocking(false); -> int fcntl(int fd, int cmd, ... /

arg */ );

5、selector=Selector.open(); -> EPollSelectorImpl epollCreate() -> int epoll_create(int size);

6、ssc.register(selector,SelectionKey.OP_ACCEPT); -> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

while

7、selector.select(TIMEOUT) -> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

8.2 epoll流程

epoll.png

如图,eventpoll 主要包含3个结构

监视队列:epoll_ctl添加或删除所要监听的socket。例如:图中通过epoll_ctl添加sock1、sock2的监视,内核会将eventpoll添加到这socket的等待队列中(图中虚线)。为了方便的添加和移除,还要便于搜索,以避免重复添加,使用红黑树结构进行存储(搜索、插入和删除时间复杂度都是O(log(N)),效率较好。

等待队列:eventpoll也是一个文件句柄,因此也有等待队列,记录阻塞的线程,例如:图中线程A 执行了 epoll_wait 操作,被从工作队列中移除,然后被eventpoll 的等待队列引用;

就绪队列:当某个socket有事件发生时,中断处理程序就会把该socket加入到就绪队列,同时唤醒eventpoll 阻塞队列中的线程,此时线程只需要遍历就绪队列就可以知道哪个socket有事件发生,例如:图中sock2有数据到来时,中断处理程序先把sock2 放入就绪队列中,然后唤醒等待队列中的线程A,这时线程A 被重新加入工作队列中,等到CPU时间片轮询到线程A时,遍历就绪队列中的socket进行处理。

九、总结

select,poll,epoll都是IO多路复用机制,即可以同时监视多个文件描述符,一旦某个描述符就绪(读或写就绪),能够通知程序进行相应读写操作

select,poll需要多次遍历文件描述符集合 fd_set,把阻塞线程放到每一个socket中,当某个描述符就绪时再把这些线程从socket阻塞队列中移除;而epoll 通过eventpoll作为中介,只需要把eventpoll放到socket 的阻塞队列中,当有描述符就绪时只需要遍历就绪队列,而不需要遍历所有fd;

select,poll每次调用都要把整个fd_set集合从用户态往内核态拷贝一次,而epoll只要一次拷贝,这也能节省不少的开销。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表