您的位置:新葡亰496net > 电脑系统 > 网络编程,从实现到应用

网络编程,从实现到应用

发布时间:2019-09-11 16:39编辑:电脑系统浏览(64)

    现在有这么一个场景:我是一个很忙的大老板,我有100个手机,手机来信息了,我的秘书就会告诉我“老板,你的手机来信息了。”我很生气,我的秘书就是这样子,每次手机来信息就只告诉我来信息了,老板赶紧去看。但是她从来不把话说清楚:到底是哪个手机来信息啊!我可有100个手机啊!于是,我只能一个一个手机去查看,来确定到底是哪几个手机来信息了。这就是IO复用中select模型的缺点!老板心想,要是秘书能把来信息的手机直接拿到我桌子上就好了,那么我的效率肯定大增(这就是epoll模型)。

    I/O多路复用是在多线程或多进程编程中常用技术。主要是通过select/epoll/poll三个函数支持的。在此主要对select和epoll函数详细介绍。

     

     

    那我们先来总结一下select模型的缺点:

    select函数

    • 该函数运行进程指示内核等待多个事件中的任何一个发生,并只有一个或多个事件发生或经历一段指定的时间后才唤醒它。
    • 调用select告知内核对哪些描述符(就读、写或异常条件)感兴趣以及等待多长时间。我们感兴趣的描述符不局限于套接字,任何描述符都可以使用select来测试。
    • 函数原型:

      #include<sys/select.h>
      #include<sys/time.h>
      
      int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set
                  *exceptset, const struct timeval *timeout);
                  返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
      
      • 最后一个参数timeout,它告知内核等待所指定描述符中的任何一个就绪可花多长时间。该参数有三种可能:
        • 永远等待下去:仅在有一个描述符准备好I/O时才返回,将其设为空指针
        • 等待一段固定时间:在有一个描述符准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
        • 根本不等待:检查描述符后立即返回,这就是轮询。为此,该参数必须指向一个timeval结构,但是其中的值必须设置为0
      • 三个参数readset,writeset,exceptset指定我们要让内核测试读、写和异常条件的描述符。
      • 如何给这三个参数的每一个参数指定一个或多个描述符值是一个设计上的问题。select使用描述符集,通常是一个整数数组,其中每个整数中的每一位对应一个描述符。举例来说,假设使用32位整数,那么该数组的第一个元素对应于描述符0~31,第二个元素对应于描述符32~63,以此类推。所有这些实现细节都与应用程序无关,它们隐藏在名为fd_set的数据类型和以下四个宏中:

        void FD_ZERO(fd_set *fdset);    //clear all bits in fdset
        void FD_SET(int fd, fd_set *fdset);   //turn on the bit for fd in fdset
        void FD_CLR(int fd, fd_set *fdset);  //turn off the bit for fd in fdset
        int FD_ISSET(int fd, fd_set *fdset);  //is the bit for fd on in fdset?
        

        我们分配一个fd_set数据类型的描述符集,并用这些宏设置或测试该集合中的每一位,也可以用C语言中的赋值语句把它赋值成另外一个描述符集。
        注意:前面所讨论的每个描述符占用整数数组中的一位的方法仅仅是select函数的可能实现之一。

      • maxfdp1参数指定待测试的描述符个数,它的值是待测试的最大描述符加1。描述符0,1,2,...,直到maxfdp1 - 1均被测试。

      • select函数修改由指针readset,writeset和exceptset所指向的描述符集,因而这三个参数都是值-结果参数。该函数返回后,我们使用FD_ISSET宏来测试fd_set数据类型中的描述符。描述符集内任何与未就绪描述符所对应的位返回时均清成0.为此,每次重新调用select函数时,我们都得再次把所有描述符集内所关心的位均置为1

    多路复用的适用场合

    多路复用的适用场合

    1. 单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在linux内核头文件中,有这样的定义:#define __FD_SETSIZE 1024)
    2. 内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
      select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
    3. select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。

    select返回套接字的“就绪”条件

    • 满足下列四个条件之一的任何一个时,一个套接字准备好读:
      • 该套接字接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记的当前大小。对于这样的套接字执行读操作不会阻塞并将返回一个大于0的值(也就是返回准备好读入的数据)。我们使用SO_RECVLOWAT套接字选项设置套接字的低水位标记。对于TCP和UDP套接字而言,其默认值为1
      • 该连接的读半部关闭(也就是接收了FIN的TCP连接)。对这样的套接字的读操作将不阻塞并返回0(也就是返回EOF)
      • 该套接字时一个监听套接字且已完成的连接数不为0。
      • 其上有一个套接字错误待处理。对这样的套接字的读操作将不阻塞并返回-1(也就是返回一个错误),同时把errno设置为确切的错误条件。这些待处理错误也可以通过SO_ERROR套接字选项调用getsockopt获取并清除。
    • 下列四个条件的任何一个满足时,一个套接字准备好写:
      • 该套接字发送缓冲区中的可用字节数大于等于套接字发送缓冲区低水位标记的当前大小,并且或该套接字已连接,或者该套接字不需要连接(如UDP套接字)。这意味着如果我们把这样的套接字设置成非阻塞的,写操作将不阻塞并返回一个正值(如由传输层接收的字节数)。我们使用SO_SNDLOWAT套接字选项来设置该套接字的低水位标记。对于TCP和UDP而言,默认值为2048
      • 该连接的写半部关闭。对这样的套接字的写操作将产生SIGPIPE信号
      • 使用非阻塞式connect套接字已建立连接,或者connect已经已失败告终
      • 其上有一个套接字错误待处理。对这样的套接字的写操作将不阻塞并返回-1(也就是返回一个错误),同时把errno设置为确切的错误条件。这些待处理错误也可以通过SO_ERROR套接字选项调用getsockopt获取并清除。
    • 如果一个套接字存在带外数据或者仍处于带外标记,那么它有异常条件待处理。
    • 注意:当某个套接字上发生错误时,它将由select标记为既可读又可写
    • 接收低水位标记和发送低水位标记的目的在于:允许应用进程控制在select可读或可写条件之前有多少数据可读或有多大空间可用于写。
    • 任何UDP套接字只要其发送低水位标记小于等于发送缓冲区大小(默认应该总是这种关系)就总是可写的,这是因为UDP套接字不需要连接。

    •     当客户处理多个描述符时(例如同时处理交互式输入和网络套接口) 

    •     当客户处理多个描述符时(例如同时处理交互式输入和网络套接口) 

    设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?

    poll函数

    • 函数原型:

      #include<poll.h>
      
      int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
      
          返回:若有就绪描述符则为数目,若超时则为0,若出错则为-1
      
    • 第一个参数是指向一个结构数组第一个元素的指针。每个数组元素都是一个pollfd结构,用于指定测试某个给定描述符fd的条件。

      struct pollfd{
          int fd;    //descriptor to check
          short event;  //events of interest on fd
          short revents;  //events that occurred on fd
      };
      

      要测试的条件由events成员指定,函数在相应的revents成员中返回该描述符的状态。(每个描述符都有两个变量,一个为调用值,另一个为返回结果,从而避免使用值-结果参数。)

    • poll事件

    图片 1

    •     如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口 

    •     如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口 

    粗略计算一下,一个进程最多有1024个文件描述符,那么我们需要开1000个进程来处理100万个客户连接。如果我们使用select模型,这1000个进程里某一段时间内只有数个客户连接需要数据的接收,那么我们就不得不轮询1024个文件描述符以确定究竟是哪个客户有数据可读,想想如果1000个进程都有类似的行为,那系统资源消耗可有多大啊!

    epoll函数

    • 网络编程,从实现到应用。epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有很大的差异。
      • 首先,epoll使用一组函数来完成任务,而不是单个函数。
      • 其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。
      • 但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表
    • epoll文件描述符使用如下方式创建:

      #include<sys/epoll.h>
      
      int epoll_create(int size);
      

      size参数完全不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。

    • 下面的函数用来操作epoll的内核事件表:

      #include<sys/epoll.h>
      
      int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
      
          返回:若成功返回0,失败返回-1,并置errno
      

      fd参数是要操作的文件描述符,op参数则指定操作类型。操作类型有以下三类:

      • EPOLL_CTL_ADD, 往事件表中注册fd上的事件
      • EPOLL_CTL_MOD, 修改fd上的注册事件
      • EPOLL_CTL_DEL, 删除fd上的注册事件
    • event指定事件,它是epoll_event结构指针类型,epoll_event的定义如下:

      strcut epoll_event{
          __uint32_t events;    //epoll事件
          epoll_data_t data;    //用户数据
      };
      
      • 其中,events成员描述事件类型。epoll支持的事件类型同poll基本相同。表示epoll事件类型的宏在poll对应的宏前加上"E",比如epoll的数据可读事件是EPOLLIN。
      • epoll有两个额外的事件类型——EPOLLET和EPOLLONESHOT。它们对于epoll的高效运作非常关键。
      • data成员用于存储用户数据,是一个联合体:

        typedef union epoll_data{
            void *ptr;
            int fd;
            uint32_t u32;
            uint64_t u64;
        }epoll_data_t;
        

        其中4个成员用得最多的是fd,它指定事件所从属的目标文件描述符。

    • epoll系列系统调用的主要接口是epoll_wait函数,它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

      #include<sys/epoll.h>
      
      int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
      
          返回:若成功返回就绪的文件描述符个数,失败时返回-1,并置errnoo
      
    • maxevents参数指定最多监听多少个事件,它必须大于0

    • event_wait函数如果检测到事件,就将所有就绪事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。

    • 下面代码给出 poll和epoll在使用上的差别:

      //如何索引poll返回的就绪文件描述符
      int ret = poll(fds, MAX_EVENT_NUMBER, -1);
      //必须遍历所有已注册文件描述符并找到其中的就绪者
      for(int i = 0; i < MAX_EVENT_NUMBER;   i){
          if(fds[i].revents & POLLIN)  //判断第 i 个文件描述符是否就绪
          {
              int sockfd = fds[i].fd;
              //处理sockfd
          }
      }
      
        //如何索引epoll返回的文件描述符
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        //仅遍历就绪的ret个文件描述符
        for(int i = 0; i < ret;   i){
            int sockfd = events[i].data.fd;
            //sockfd肯定就绪,直接处理
        }
    
    • LT和ET模式
      • LT(Level Trigger,电平触发)模式:是默认工作模式,在这种模式下的epoll相当于一个效率较高的poll。当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件。
      • ET(Edge Trigger,边沿触发)模式。对于ET工作模式下的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。
      • ET模式在很大程度上降低了同一个epoll事件被重复触发的次数。因此效率要比LT模式高。
      • 每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续的时间而一直处于阻塞状态(饥渴状态)
    • EPOLLONESHOT事件
      • 即使使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中引起一个问题。比如一个线程(或进程)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是出现了两个线程同时操作一个socket的场面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。
      • 对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程时不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket.

    •     如果一个服务器即要处理TCP,又要处理UDP 

    •     如果一个服务器即要处理TCP,又要处理UDP 

    针对select模型的缺点,epoll模型被提出来了!

    •     如果一个服务器要处理多个服务或多个协议 

    •     如果一个服务器要处理多个服务或多个协议 

    epoll模型的优点

    • 支持一个进程打开大数目的socket描述符
    • IO效率不随FD数目增加而线性下降
    • 使用mmap加速内核与用户空间的消息传递

    select/poll/epoll差别

    1. poll返回的时候用户态需要轮询判断每个描述符的状态,即使只有一个描述符就绪,也要遍历整个集合。如果集合中活跃的描述符很少,遍历过程的开销就会变得很大,而如果集合中大部分的描述符都是活跃的,遍历过程的开销又可以忽略。epoll的实现中每次只遍历活跃的描述符,在活跃描述符较少的情况下就会很有优势,在代码的分析过程中可以看到epoll的实现过于复杂并且其实现过程为实现线程安全需要同步处理(锁),如果大部分描述符都是活跃的,遍历这点区别相对于加锁来说已经微不足道了,epoll的效率可能不如select或poll。
    2. 传参方式不同 
      • 支持的最大描述符不同,根本原因是内核管理每个文件句柄的数据结构不同,select能够处理的最大fd无法超出FDSETSIZE,因为调用select传入的参数fd_set是一个位数组,数组大小就是FDSETSIZE默认为1024,所以调用方式限制了并发量。Poll是利用一个数组传入的参数,没有最大限制。Epoll不需要每次都传入,因为会调用epoll_ctl添加。
      • 使用方式不同,select调用每次都由于内核会对数组进行在线修改,应用程序下次调用select前不得不重置这三个fdset,而poll比他聪明点,将句柄与事件绑定在一起通过一个struct pollfd实现,返回时是通过其revets实现,所以不需要重置该结构,直接传递就行,epoll不需要传递。
      • 支持的事件类型数不同:select应为没有将句柄与事件进行绑定,所以fd_set仅仅是个文件描述符集合,因此需要三个fd_set分别传入可读可写及异常事件,这使得他不能处理更多类型的事件,而poll采用的pollfd中event需要使用64个bit,epoll采用的 epoll_event则需要96个bit,支持更多的事件类型。
    3. poll每次需要从用户态将所有的句柄复制到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。使用epoll时你只需要调用epoll_ctl事先添加到对应红黑树,真正用epoll_wait时不用传递socket句柄给内核,节省了拷贝开销。
    4. 内核实现上:轮流调用所有fd对应的poll(把current挂到各个fd对应的设备等待队列上),等到有事件发生的时候会通知他,在调用结束后,又把进程从各个等待队列中删除。在 epoll_wait时,把current轮流的加入fd对应的设备等待队列,在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。
    5. Select 不是线程安全的,epoll是线程安全的,内部提供了锁的保护,就算一个线程在epoll_wait的时候另一个线程epoll_ctl也没问题。
    6. 内核使用了slab机制,为epoll提供了快速的数据结构。
    7. Select和poll相当于epoll的LT模式,不支持ET模式,epoll支持更为该高效的ET模式  (ET和LT差别见下文)

     

    select/poll/epoll差别

    1. poll返回的时候用户态需要轮询判断每个描述符的状态,即使只有一个描述符就绪,也要遍历整个集合。如果集合中活跃的描述符很少,遍历过程的开销就会变得很大,而如果集合中大部分的描述符都是活跃的,遍历过程的开销又可以忽略。epoll的实现中每次只遍历活跃的描述符,在活跃描述符较少的情况下就会很有优势,在代码的分析过程中可以看到epoll的实现过于复杂并且其实现过程为实现线程安全需要同步处理(锁),如果大部分描述符都是活跃的,遍历这点区别相对于加锁来说已经微不足道了,epoll的效率可能不如select或poll。
    2. 传参方式不同 
      • 支持的最大描述符不同,根本原因是内核管理每个文件句柄的数据结构不同,select能够处理的最大fd无法超出FDSETSIZE,因为调用select传入的参数fd_set是一个位数组,数组大小就是FDSETSIZE默认为1024,所以调用方式限制了并发量。Poll是利用一个数组传入的参数,没有最大限制。Epoll不需要每次都传入,因为会调用epoll_ctl添加。
      • 使用方式不同,select调用每次都由于内核会对数组进行在线修改,应用程序下次调用select前不得不重置这三个fdset,而poll比他聪明点,将句柄与事件绑定在一起通过一个struct pollfd实现,返回时是通过其revets实现,所以不需要重置该结构,直接传递就行,epoll不需要传递。
      • 支持的事件类型数不同:select应为没有将句柄与事件进行绑定,所以fd_set仅仅是个文件描述符集合,因此需要三个fd_set分别传入可读可写及异常事件,这使得他不能处理更多类型的事件,而poll采用的pollfd中event需要使用64个bit,epoll采用的 epoll_event则需要96个bit,支持更多的事件类型。
    3. poll每次需要从用户态将所有的句柄复制到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。使用epoll时你只需要调用epoll_ctl事先添加到对应红黑树,真正用epoll_wait时不用传递socket句柄给内核,节省了拷贝开销。
    4. 内核实现上:轮流调用所有fd对应的poll(把current挂到各个fd对应的设备等待队列上),等到有事件发生的时候会通知他,在调用结束后,又把进程从各个等待队列中删除。在 epoll_wait时,把current轮流的加入fd对应的设备等待队列,在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。
    5. Select 不是线程安全的,epoll是线程安全的,内部提供了锁的保护,就算一个线程在epoll_wait的时候另一个线程epoll_ctl也没问题。
    6. 内核使用了slab机制,为epoll提供了快速的数据结构。
    7. Select和poll相当于epoll的LT模式,不支持ET模式,epoll支持更为该高效的ET模式  (ET和LT差别见下文)

     

    epoll的两种工作模式

    • LT(level triggered,水平触发模式)是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。比如内核通知你其中一个fd可以读数据了,你赶紧去读。你还是懒懒散散,不去读这个数据,下一次循环的时候内核发现你还没读刚才的数据,就又通知你赶紧把刚才的数据读了。这种机制可以比较好的保证每个数据用户都处理掉了。

    • ET(edge-triggered,边缘触发模式)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,等到下次有新的数据进来的时候才会再次出发就绪事件。简而言之,就是内核通知过的事情不会再说第二遍,数据错过没读,你自己负责。这种机制确实速度提高了,但是风险相伴而行。

    epoll工作原理

    epoll_create

    操作系统在启动时会注册一个evnetpollfs的文件系统,对应的file operations只是实现了poll跟release操作,然后初始化一些数据结构,例如一个slab缓存,以便后面简化epitem和eppoll_entry对象的分配, 初始化递归检查队列等。

    创建一个eventpoll对象, 里边有用户信息,是不是root,最大监听fd数目,等待队列,就绪链表,红黑树的头结点等,并且创建一个fd 即epollfd,,

    而eventpoll对象保存在struct file结构的private指针中,为方便从fd得到eventpoll对象,并返回。

     

    epoll_ctl

    将epoll_event结构拷贝到内核空间中;

    并且判断加入的fd是否支持poll结构;

    并且从epfd->file->privatedata获取event_poll对象,根据op区分是添加,删除还是修改;

    首先在eventpoll结构中的红黑树查找是否已经存在了相对应的fd,没找到就支持插入操作,否则报重复的错误;

    相对应的修改,删除比较简单就不啰嗦了

    插入时会进行上锁。

     

    插入操作时,会创建一个与fd对应的epitem结构,并且初始化相关成员,比如保存监听的fd跟file结构之类的,

     

    最后调用加入的fd的file operation->poll函数(最后会调用poll_wait操作)用于来将当前进程注册到设备的等待队列:在其内传递poll_table变量调用poll_wait,poll_table会提供一个函数指针,事实上调用的就是这个函数指针指向的对象,该函数就是将当前进行挂在设备的等待队列中,并指定设备事件就绪时的回调函数callback,该callback的实现就是将该epitem放在rdlist链表中。

     

    最后将epitem结构添加到红黑树中

     

    epoll_wait

    计算睡眠时间(如果有),判断eventpoll对象的链表是否为空,不为空那就干活,不睡眠,并且初始化一个等待队列,把自己挂上去,设置自己的进程状态为可睡眠状态。判断是否有信号到来(有的话直接被中断醒来),如果啥事都没有那就调用schedule_timeout进行睡眠,如果超时或者被唤醒,首先从自己初始化的等待队列删除 ,然后开始拷贝资源给用户空间了。

    拷贝资源则是先把就绪事件链表转移到中间链表,然后挨个遍历拷贝到用户空间。

    并且挨个判断其是否为水平触发,是的话再次插入到就绪链表。

       

    具体实现由很多细节: 如果拷贝rdlist过程中又有事件就绪了怎么办,如果epollfd被另一个epoll监听会不会循环唤醒,lt什么时候会从rdlist中删除等,见下文 !

    epoll工作原理

    epoll_create

    操作系统在启动时会注册一个evnetpollfs的文件系统,对应的file operations只是实现了poll跟release操作,然后初始化一些数据结构,例如一个slab缓存,以便后面简化epitem和eppoll_entry对象的分配, 初始化递归检查队列等。

    创建一个eventpoll对象, 里边有用户信息,是不是root,最大监听fd数目,等待队列,就绪链表,红黑树的头结点等,并且创建一个fd 即epollfd,,

    而eventpoll对象保存在struct file结构的private指针中,为方便从fd得到eventpoll对象,并返回。

     

    epoll_ctl

    将epoll_event结构拷贝到内核空间中;

    并且判断加入的fd是否支持poll结构;

    并且从epfd->file->privatedata获取event_poll对象,根据op区分是添加,删除还是修改;

    首先在eventpoll结构中的红黑树查找是否已经存在了相对应的fd,没找到就支持插入操作,否则报重复的错误;

    相对应的修改,删除比较简单就不啰嗦了

    插入时会进行上锁。

     

    插入操作时,会创建一个与fd对应的epitem结构,并且初始化相关成员,比如保存监听的fd跟file结构之类的,

     

    最后调用加入的fd的file operation->poll函数(最后会调用poll_wait操作)用于来将当前进程注册到设备的等待队列:在其内传递poll_table变量调用poll_wait,poll_table会提供一个函数指针,事实上调用的就是这个函数指针指向的对象,该函数就是将当前进行挂在设备的等待队列中,并指定设备事件就绪时的回调函数callback,该callback的实现就是将该epitem放在rdlist链表中。

     

    最后将epitem结构添加到红黑树中

     

    epoll_wait

    计算睡眠时间(如果有),判断eventpoll对象的链表是否为空,不为空那就干活,不睡眠,并且初始化一个等待队列,把自己挂上去,设置自己的进程状态为可睡眠状态。判断是否有信号到来(有的话直接被中断醒来),如果啥事都没有那就调用schedule_timeout进行睡眠,如果超时或者被唤醒,首先从自己初始化的等待队列删除 ,然后开始拷贝资源给用户空间了。

    拷贝资源则是先把就绪事件链表转移到中间链表,然后挨个遍历拷贝到用户空间。

    并且挨个判断其是否为水平触发,是的话再次插入到就绪链表。

       

    具体实现由很多细节: 如果拷贝rdlist过程中又有事件就绪了怎么办,如果epollfd被另一个epoll监听会不会循环唤醒,lt什么时候会从rdlist中删除等,见下文 !

    epoll模型API

    #include <sys/epoll.h> 
    
    /* 创建一个epoll的句柄,size用来告诉内核需要监听的数目一共有多大。当创建好epoll句柄后,
    它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。*/
    int epoll_create(int size);  
    
    /*epoll的事件注册函数*/
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 
    
    /*等待事件的到来,如果检测到事件,就将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组*/
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);  
    

    epoll的事件注册函数epoll_ctl,第一个参数是 epoll_create() 的返回值,第二个参数表示动作,使用如下三个宏来表示:

    POLL_CTL_ADD    //注册新的fd到epfd中;
    EPOLL_CTL_MOD    //修改已经注册的fd的监听事件;
    EPOLL_CTL_DEL    //从epfd中删除一个fd;
    

    struct epoll_event 结构如下:

    typedef union epoll_data
    {
        void        *ptr;
        int          fd;
        __uint32_t   u32;
        __uint64_t   u64;
    } epoll_data_t;
    
    struct epoll_event 
    {
        __uint32_t events; /* Epoll events */
        epoll_data_t data; /* User data variable */
    };
    

    epoll_event结构体中的events 可以是以下几个宏的集合:

    EPOLLIN     //表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT    //表示对应的文件描述符可以写;
    EPOLLPRI    //表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR    //表示对应的文件描述符发生错误;
    EPOLLHUP    //表示对应的文件描述符被挂断;
    EPOLLET     //将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    EPOLLONESHOT//只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
    

    EPOll的ET与LT

    内核实现:

    只是在从rdlist中返回的时候有区别,内核首先会将rdlist拷贝到一个临时链表txlist, 然后如果是LT事件并且事件就绪的话fd被重新放回了rdllist。那么下次epoll_wait当然会又把rdllist里的fd拿来拷给用户了。举个例子。假设一个socket,只是connect,还没有收发数据,那么它的poll事件掩码总是有POLLOUT的,每次调用epoll_wait总是返回POLLOUT事件,因为它的fd就总是被放回rdllist;假如此时有人往这个socket里写了一大堆数据,造成socket塞住,fd不会放回rdllist,epoll_wait将不会再返回用户POLLOUT事件。如果我们给这个socket加上EPOLLET,然后connect,没有收发数据,epoll_wait只会返回一次POLLOUT通知给用户(因为此fd不会再回到rdllist了),接下来的epoll_wait都不会有任何事件通知了。

     

    注意上面LT fd拷贝回rdlist并不是向用户处理完之后发生的,而是向用户拷贝完之后直接复制到rdlist中,那么如果用户消费这个事件使事件不就绪了怎么办,比如说本来是可读的,返回给用户,用户读到不可读为止,继续调用epoll_wait 返回rdlist,则发现不可读,事实上每次返回之前会以NULL继续调用poll,判断事件是否变化,平时调用poll会传递个poll_table变量,就进行添加到等待队列中,而此时不需要添加,只是判断一下状态,如果rdlist中状态变化了,就不会给用户返回了。

     

    触发方式:

    根据对两种加入rdlist途径的分析,可以得出ET模式下被唤醒(返回就绪)的条件为:

    对于读取操作:

    (1) 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。

    (2) 当有新数据到达时,即buffer中的待读内容变多的时候。

    (3) 当buffer中有数据可读(即buffer不空)且用户对相应fd进行epoll_mod IN事件时

     

    对于写操作:

    (1) 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。

    (2) 当有旧数据被发送走时,即buffer中待写的内容变少得时候。

    (3) 当buffer中有可写空间(即buffer不满)且用户对相应fd进行epoll_mod OUT事件时

     

    对于LT模式则简单多了,除了上述操作为读了一条事件就绪就一直通知。

     

    ET比LT高效的原因:

    经过上面的分析,可得到LT每次都需要处理rdlist,无疑向用户拷贝的数据变多,且epoll_wait循环也变多,性能自然下降了。

    另外一方面从用户角度考虑,使用ET模式,它可以便捷的处理EPOLLOUT事件,省去打开与关闭EPOLLOUT的epoll_ctl(EPOLL_CTL_MOD)调用。从而有可能让你的性能得到一定的提升。例如你需要写出1M的数据,写出到socket 256k时,返回了EAGAIN,ET模式下,当再次epoll返回EPOLLOUT事件时,继续写出待写出的数据,当没有数据需要写出时,不处理直接略过即可。而LT模式则需要先打开EPOLLOUT,当没有数据需要写出时,再关闭EPOLLOUT(否则会一直返回EPOLLOUT事件),而调用epoll_ctl是系统调用,要陷入内核并且需要操作加锁红黑树,总体来说,ET处理EPOLLOUT方便高效些,LT不容易遗漏事件、不易产生bug,如果server的响应通常较小,不会触发EPOLLOUT,那么适合使用LT,例如redis等,这种情况下甚至不需要关注EPOLLOUT,流量足够小的时候直接发送,如果发送不完在进行关注EPOLLOUT,发送完取消关注就行了,可以进行稍微的优化。而nginx作为高性能的通用服务器,网络流量可以跑满达到1G,这种情况下很容易触发EPOLLOUT,则使用ET。

    参见知乎

     

    实际应用:

    当epoll工作在ET模式下时,对于读操作,如果read一次没有读尽buffer中的数据,那么下次将得不到读就绪的通知,造成buffer中已有的数据无机会读出,除非有新的数据再次到达。对于写操作,主要是因为ET模式下fd通常为非阻塞造成的一个问题——如何保证将用户要求写的数据写完。

    要解决上述两个ET模式下的读写问题,我们必须实现:

    a. 对于读,只要buffer中还有数据就一直读;

    b. 对于写,只要buffer还有空间且用户请求写的数据还未写完,就一直写。

     

    使用这种方式一定要使每个连接的套接字工作于非阻塞模式,因为读写需要一直读或写直到出错(对于读,当读到的实际字节数小于请求字节数时就可以停止),而如果你的文件描述符如果不是非阻塞的,那这个一直读或一直写势必会在最后一次阻塞。这样就不能在阻塞在epoll_wait上了,造成其他文件描述符的任务饿死。

    所以也就常说“ET需要工作在非阻塞模式”,当然这并不能说明ET不能工作在阻塞模式,而是工作在阻塞模式可能在运行中会出现一些问题。

     

    ET模式下的accept

        考虑这种情况:多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪

    连接,由于是边缘触发模式,epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理。

         解决办法是用 while 循环抱住 accept 调用,处理完 TCP 就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢? accept  返回 -1 并且 errno 设置为 EAGAIN 就表示所有连接都处理完。

    的正确使用方式为:

    while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) {  

        handle_client(conn_sock);  

    }  

    if (conn_sock == -1) {  

         if (errno != EAGAIN && errno != ECONNABORTED   

                && errno != EPROTO && errno != EINTR)   

            perror("accept");  

    }

    扩展:服务端使用多路转接技术(select,poll,epoll等)时,accept应工作在非阻塞模式。

    原因:如果accept工作在阻塞模式,考虑这种情况: TCP 连接被客户端夭折,即在服务器调用 accept 之前(此时select等已经返回连接到达读就绪),客户端主动发送 RST 终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在 accept 调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept 调用上(实际应该阻塞在select上),就绪队列中的其他描述符都得不到处理。

        解决办法是把监听套接口设置为非阻塞, 当客户在服务器调用 accept 之前中止

    某个连接时,accept 调用可以立即返回 -1, 这时源自 Berkeley 的实现会在内核中处理该事件,并不会将该事件通知给 epoll,而其他实现把 errno 设置为 ECONNABORTED 或者 EPROTO 错误,我们应该忽略这两个错误。(具体可参看UNP v1 p363)

     

    EPOll的ET与LT

    内核实现:

    只是在从rdlist中返回的时候有区别,内核首先会将rdlist拷贝到一个临时链表txlist, 然后如果是LT事件并且事件就绪的话fd被重新放回了rdllist。那么下次epoll_wait当然会又把rdllist里的fd拿来拷给用户了。举个例子。假设一个socket,只是connect,还没有收发数据,那么它的poll事件掩码总是有POLLOUT的,每次调用epoll_wait总是返回POLLOUT事件,因为它的fd就总是被放回rdllist;假如此时有人往这个socket里写了一大堆数据,造成socket塞住,fd不会放回rdllist,epoll_wait将不会再返回用户POLLOUT事件。如果我们给这个socket加上EPOLLET,然后connect,没有收发数据,epoll_wait只会返回一次POLLOUT通知给用户(因为此fd不会再回到rdllist了),接下来的epoll_wait都不会有任何事件通知了。

     

    注意上面LT fd拷贝回rdlist并不是向用户处理完之后发生的,而是向用户拷贝完之后直接复制到rdlist中,那么如果用户消费这个事件使事件不就绪了怎么办,比如说本来是可读的,返回给用户,用户读到不可读为止,继续调用epoll_wait 返回rdlist,则发现不可读,事实上每次返回之前会以NULL继续调用poll,判断事件是否变化,平时调用poll会传递个poll_table变量,就进行添加到等待队列中,而此时不需要添加,只是判断一下状态,如果rdlist中状态变化了,就不会给用户返回了。

     

    触发方式:

    根据对两种加入rdlist途径的分析,可以得出ET模式下被唤醒(返回就绪)的条件为:

    对于读取操作:

    (1) 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。

    (2) 当有新数据到达时,即buffer中的待读内容变多的时候。

    (3) 当buffer中有数据可读(即buffer不空)且用户对相应fd进行epoll_mod IN事件时

     

    对于写操作:

    (1) 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。

    (2) 当有旧数据被发送走时,即buffer中待写的内容变少得时候。

    (3) 当buffer中有可写空间(即buffer不满)且用户对相应fd进行epoll_mod OUT事件时

     

    对于LT模式则简单多了,除了上述操作为读了一条事件就绪就一直通知。

     

    ET比LT高效的原因:

    经过上面的分析,可得到LT每次都需要处理rdlist,无疑向用户拷贝的数据变多,且epoll_wait循环也变多,性能自然下降了。

    另外一方面从用户角度考虑,使用ET模式,它可以便捷的处理EPOLLOUT事件,省去打开与关闭EPOLLOUT的epoll_ctl(EPOLL_CTL_MOD)调用。从而有可能让你的性能得到一定的提升。例如你需要写出1M的数据,写出到socket 256k时,返回了EAGAIN,ET模式下,当再次epoll返回EPOLLOUT事件时,继续写出待写出的数据,当没有数据需要写出时,不处理直接略过即可。而LT模式则需要先打开EPOLLOUT,当没有数据需要写出时,再关闭EPOLLOUT(否则会一直返回EPOLLOUT事件),而调用epoll_ctl是系统调用,要陷入内核并且需要操作加锁红黑树,总体来说,ET处理EPOLLOUT方便高效些,LT不容易遗漏事件、不易产生bug,如果server的响应通常较小,不会触发EPOLLOUT,那么适合使用LT,例如redis等,这种情况下甚至不需要关注EPOLLOUT,流量足够小的时候直接发送,如果发送不完在进行关注EPOLLOUT,发送完取消关注就行了,可以进行稍微的优化。而nginx作为高性能的通用服务器,网络流量可以跑满达到1G,这种情况下很容易触发EPOLLOUT,则使用ET。

    参见知乎

     

    实际应用:

    当epoll工作在ET模式下时,对于读操作,如果read一次没有读尽buffer中的数据,那么下次将得不到读就绪的通知,造成buffer中已有的数据无机会读出,除非有新的数据再次到达。对于写操作,主要是因为ET模式下fd通常为非阻塞造成的一个问题——如何保证将用户要求写的数据写完。

    要解决上述两个ET模式下的读写问题,我们必须实现:

    a. 对于读,只要buffer中还有数据就一直读;

    b. 对于写,只要buffer还有空间且用户请求写的数据还未写完,就一直写。

     

    使用这种方式一定要使每个连接的套接字工作于非阻塞模式,因为读写需要一直读或写直到出错(对于读,当读到的实际字节数小于请求字节数时就可以停止),而如果你的文件描述符如果不是非阻塞的,那这个一直读或一直写势必会在最后一次阻塞。这样就不能在阻塞在epoll_wait上了,造成其他文件描述符的任务饿死。

    所以也就常说“ET需要工作在非阻塞模式”,当然这并不能说明ET不能工作在阻塞模式,而是工作在阻塞模式可能在运行中会出现一些问题。

     

    ET模式下的accept

        考虑这种情况:多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪

    连接,由于是边缘触发模式,epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理。

         解决办法是用 while 循环抱住 accept 调用,处理完 TCP 就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢? accept  返回 -1 并且 errno 设置为 EAGAIN 就表示所有连接都处理完。

    的正确使用方式为:

    while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) {  

        handle_client(conn_sock);  

    }  

    if (conn_sock == -1) {  

         if (errno != EAGAIN && errno != ECONNABORTED   

                && errno != EPROTO && errno != EINTR)   

            perror("accept");  

    }

    扩展:服务端使用多路转接技术(select,poll,epoll等)时,accept应工作在非阻塞模式。

    原因:如果accept工作在阻塞模式,考虑这种情况: TCP 连接被客户端夭折,即在服务器调用 accept 之前(此时select等已经返回连接到达读就绪),客户端主动发送 RST 终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在 accept 调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept 调用上(实际应该阻塞在select上),就绪队列中的其他描述符都得不到处理。

        解决办法是把监听套接口设置为非阻塞, 当客户在服务器调用 accept 之前中止

    某个连接时,accept 调用可以立即返回 -1, 这时源自 Berkeley 的实现会在内核中处理该事件,并不会将该事件通知给 epoll,而其他实现把 errno 设置为 ECONNABORTED 或者 EPROTO 错误,我们应该忽略这两个错误。(具体可参看UNP v1 p363)

     

    epoll的一个简单使用范例

    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <string.h>
    
    
    
    #define MAXLINE 5
    #define OPEN_MAX 100
    #define LISTENQ 20
    #define SERV_PORT 5000
    #define INFTIM 1000
    
    void setnonblocking(int sock)
    {
        int opts;
        opts=fcntl(sock,F_GETFL);
        if(opts<0)
        {
            perror("fcntl(sock,GETFL)");
            exit(1);
        }
        opts = opts|O_NONBLOCK;
        if(fcntl(sock,F_SETFL,opts)<0)
        {
            perror("fcntl(sock,SETFL,opts)");
            exit(1);
        }
    }
    
    int main(int argc, char* argv[])
    {
        int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber;
        ssize_t n;
        char line[MAXLINE];
        socklen_t clilen;
    
    
        if ( 2 == argc )
        {
            if( (portnumber = atoi(argv[1])) < 0 )
            {
                fprintf(stderr,"Usage:%s portnumber/a/n",argv[0]);
                return 1;
            }
        }
        else
        {
            fprintf(stderr,"Usage:%s portnumber/a/n",argv[0]);
            return 1;
        }
    
    
    
        //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    
        struct epoll_event ev,events[20];
        //生成用于处理accept的epoll专用的文件描述符
    
        epfd=epoll_create(256);
        struct sockaddr_in clientaddr;
        struct sockaddr_in serveraddr;
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        //把socket设置为非阻塞方式
    
        //setnonblocking(listenfd);
    
        //设置与要处理的事件相关的文件描述符
    
        ev.data.fd=listenfd;
        //设置要处理的事件类型
    
        ev.events=EPOLLIN|EPOLLET;
        //ev.events=EPOLLIN;
    
        //注册epoll事件
    
        epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
        bzero(&serveraddr, sizeof(serveraddr));
        serveraddr.sin_family = AF_INET;
        char *local_addr="127.0.0.1";
        inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber);
    
        serveraddr.sin_port=htons(portnumber);
        bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr));
        listen(listenfd, LISTENQ);
        maxi = 0;
        for ( ; ; ) {
            //等待epoll事件的发生
    
            nfds=epoll_wait(epfd,events,20,500);
            //处理所发生的所有事件
    
            for(i=0;i<nfds;  i)
            {
                if(events[i].data.fd==listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
    
                {
                    connfd = accept(listenfd,(struct sockaddr *)&clientaddr, &clilen);
                    if(connfd<0){
                        perror("connfd<0");
                        exit(1);
                    }
                    //setnonblocking(connfd);
    
                    char *str = inet_ntoa(clientaddr.sin_addr);
                    printf("accapt a connection fromn ");
                    //设置用于读操作的文件描述符
    
                    ev.data.fd=connfd;
                    //设置用于注测的读操作事件
    
                    ev.events=EPOLLIN|EPOLLET;
                    //ev.events=EPOLLIN;
    
                    //注册ev
    
                    epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
                }
                else if(events[i].events&EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。
    
                {
                    printf("EPOLLINn");
                    if ( (sockfd = events[i].data.fd) < 0)
                        continue;
                    if ( (n = read(sockfd, line, MAXLINE)) < 0) {
                        if (errno == ECONNRESET) {
                            close(sockfd);
                            events[i].data.fd = -1;
                        } else
                            printf("readline errorn");
                    } else if (n == 0) {
                        close(sockfd);
                        events[i].data.fd = -1;
                    }
                    if(n<MAXLINE-2)
                        line[n] = '';
    
                    //设置用于写操作的文件描述符
    
                    ev.data.fd=sockfd;
                    //设置用于注测的写操作事件
    
                    ev.events=EPOLLOUT|EPOLLET;
                    //修改sockfd上要处理的事件为EPOLLOUT
    
                    //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
    
                }
                else if(events[i].events&EPOLLOUT) // 如果有数据发送
    
                {
                    sockfd = events[i].data.fd;
                    write(sockfd, line, n);
                    //设置用于读操作的文件描述符
    
                    ev.data.fd=sockfd;
                    //设置用于注测的读操作事件
    
                    ev.events=EPOLLIN|EPOLLET;
                    //修改sockfd上要处理的事件为EPOLIN
    
                    epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
                }
            }
        }
        return 0;
    }
    

    EPOLlONSHOT

           在一些监听事件和读取分开的场景中,比如说在主线程中监听,在子线程中接收数据并处理,这时候会出现两个线程同时操作一个socket的局面,比如说主线程监听到事件交由线程1处理,还未处理完又有事件到达,主线程交由线程2处理,这就导致数据不一致,一般情况下需要在该文件描述符上注册EPOLLONESHOT事件,操作系统最多触发其上注册的一个可读可写或异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该EPOLLONESHOT事件。反过来思考也一样,注册了该事件的线程处理完数据后必须重新注册,否则下次不会再次触发。参见《linux高性能服务器编程》9.3.4节

                  但是有一个缺陷,这样的话会每次都调用epoll_ctrl陷入内核,并且epoll为保证线程安全会使用了加锁红黑树,这样会严重影响性能,此时就需要换一种思路,在应用层维护一个原子整数或称为flag来记录当前句柄是否有线程在处理,每次有事件到来得时候会检查这个原子整数,如果在处理就不会分配线程处理,否则会分配线程,这样就避免了陷入内核,使用epoll_data来存储这个原子整数就行。

           对于使用EPOLLSHOT方式来防止数据不一致既可以使用ET也可以使用LT,因为他防止了再次触发,但是使用原子整数的方式只能使用ET模式,他不是防止再次触发,而是防止被多个线程处理,在有些情况下可能计算的速度跟不上io涌来的速度,就是无法及时接收缓冲区的内容,此时接收线程和主线程是分开的,如果使用LT的话主线程会一直触发事件,导致busy-loop。 而使用ET触发只有在事件到来得时候会触发,缓冲区有内容并不会触发,触发的次数就变少了,虽然主线程还是可能空转(fd有事件到来,但已被线程处理,此时不需要处理,继续epoll_wait就好),但这样空转比屡次调用epoll_ctl的概率小多了。

        上面的解决方式貌似完美,其实存在竞态的情况,如果线程1检查flag为false,没有线程处理这个socket,准备去接收处理的时候被调出CPU了,线程2获得cput后也同样发现flag为false, 就去接手socket来处理,此时如果线程1继续获得CPU,就会继续执行,接管socket,这样就会产生一个socket被两个线程处理的情况。

     

    EPOLlONSHOT

           在一些监听事件和读取分开的场景中,比如说在主线程中监听,在子线程中接收数据并处理,这时候会出现两个线程同时操作一个socket的局面,比如说主线程监听到事件交由线程1处理,还未处理完又有事件到达,主线程交由线程2处理,这就导致数据不一致,一般情况下需要在该文件描述符上注册EPOLLONESHOT事件,操作系统最多触发其上注册的一个可读可写或异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该EPOLLONESHOT事件。反过来思考也一样,注册了该事件的线程处理完数据后必须重新注册,否则下次不会再次触发。参见《linux高性能服务器编程》9.3.4节

                  但是有一个缺陷,这样的话会每次都调用epoll_ctrl陷入内核,并且epoll为保证线程安全会使用了加锁红黑树,这样会严重影响性能,此时就需要换一种思路,在应用层维护一个原子整数或称为flag来记录当前句柄是否有线程在处理,每次有事件到来得时候会检查这个原子整数,如果在处理就不会分配线程处理,否则会分配线程,这样就避免了陷入内核,使用epoll_data来存储这个原子整数就行。

           对于使用EPOLLSHOT方式来防止数据不一致既可以使用ET也可以使用LT,因为他防止了再次触发,但是使用原子整数的方式只能使用ET模式,他不是防止再次触发,而是防止被多个线程处理,在有些情况下可能计算的速度跟不上io涌来的速度,就是无法及时接收缓冲区的内容,此时接收线程和主线程是分开的,如果使用LT的话主线程会一直触发事件,导致busy-loop。 而使用ET触发只有在事件到来得时候会触发,缓冲区有内容并不会触发,触发的次数就变少了,虽然主线程还是可能空转(fd有事件到来,但已被线程处理,此时不需要处理,继续epoll_wait就好),但这样空转比屡次调用epoll_ctl的概率小多了。

        上面的解决方式貌似完美,其实存在竞态的情况,如果线程1检查flag为false,没有线程处理这个socket,准备去接收处理的时候被调出CPU了,线程2获得cput后也同样发现flag为false, 就去接手socket来处理,此时如果线程1继续获得CPU,就会继续执行,接管socket,这样就会产生一个socket被两个线程处理的情况。

     

    带ET和LT双模式的epoll服务器

    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <unistd.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    #include <errno.h>
    #include <stdbool.h>
    
    
    #define MAX_EVENT_NUMBER 1024  //event的最大数量
    #define BUFFER_SIZE 10      //缓冲区大小
    #define ENABLE_ET  1       //是否启用ET模式
    
    /* 将文件描述符设置为非拥塞的  */
    int SetNonblocking(int fd)
    {
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
    }
    
    /* 将文件描述符fd上的EPOLLIN注册到epoll_fd指示的epoll内核事件表中,参数enable_et指定是否对fd启用et模式 */
    void AddFd(int epoll_fd, int fd, bool enable_et)
    {
        struct epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN; //注册该fd是可读的
        if(enable_et)
        {
            event.events |= EPOLLET;
        }
    
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);  //向epoll内核事件表注册该fd
        SetNonblocking(fd);
    }
    
    /*  LT工作模式特点:稳健但效率低 */
    void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd)
    {
        char buf[BUFFER_SIZE];
        int i;
        for(i = 0; i < number; i  ) //number: 就绪的事件数目
        {
            int sockfd = events[i].data.fd;
            if(sockfd == listen_fd)  //如果是listen的文件描述符,表明有新的客户连接到来
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listen_fd, (struct sockaddr*)&client_address, &client_addrlength);
                AddFd(epoll_fd, connfd, false);  //将新的客户连接fd注册到epoll事件表,使用lt模式
            }
            else if(events[i].events & EPOLLIN) //有客户端数据可读
            {
                // 只要缓冲区的数据还没读完,这段代码就会被触发。这就是LT模式的特点:反复通知,直至处理完成
                printf("lt mode: event trigger once!n");
                memset(buf, 0, BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                if(ret <= 0)  //读完数据了,记得关闭fd
                {
                    close(sockfd);
                    continue;
                }
                printf("get %d bytes of content: %sn", ret, buf);
    
            }
            else
            {
                printf("something unexpected happened!n");
            }
        }
    }
    
    /* ET工作模式特点:高效但潜在危险 */
    void et_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd)
    {
        char buf[BUFFER_SIZE];
        int i;
        for(i = 0; i < number; i  )
        {
            int sockfd = events[i].data.fd;
            if(sockfd == listen_fd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listen_fd, (struct sockaddr*)&client_address, &client_addrlength);
                AddFd(epoll_fd, connfd, true);  //使用et模式
            }
            else if(events[i].events & EPOLLIN)
            {
                /* 这段代码不会被重复触发,所以我么循环读取数据,以确保把socket读缓存的所有数据读出。这就是我们消除ET模式潜在危险的手段 */
    
                printf("et mode: event trigger once!n");
                while(1)
                {
                    memset(buf, 0, BUFFER_SIZE);
                    int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                    if(ret < 0)
                    {
                        /* 对于非拥塞的IO,下面的条件成立表示数据已经全部读取完毕,此后epoll就能再次触发sockfd上的EPOLLIN事件,以驱动下一次读操作 */
    
                        if(errno == EAGAIN || errno == EWOULDBLOCK)
                        {
                            printf("read later!n");
                            break;
                        }
    
                        close(sockfd);
                        break;
                    }
                    else if(ret == 0)
                    {
                        close(sockfd);
                    }
                    else //没读完,继续循环读取
                    {
                        printf("get %d bytes of content: %sn", ret, buf);
                    }
                }
            }
            else
            {
                printf("something unexpected happened!n");
            }
        }
    }
    
    
    int main(int argc, char* argv[])
    {
        if(argc <= 2)
        {
            printf("usage:  ip_address   port_numbern");
            return -1;
        }
    
        const char* ip = argv[1];
        int port = atoi(argv[2]);
    
        int ret = -1;
        struct sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &address.sin_addr);
        address.sin_port = htons(port);
    
        int listen_fd = socket(PF_INET, SOCK_STREAM, 0);
        if(listen_fd < 0)
        {
            printf("fail to create socket!n");
            return -1;
        }
    
        ret = bind(listen_fd, (struct sockaddr*)&address, sizeof(address));
        if(ret == -1)
        {
            printf("fail to bind socket!n");
            return -1;
        }
    
        ret = listen(listen_fd, 5);
        if(ret == -1)
        {
            printf("fail to listen socket!n");
            return -1;
        }
    
        struct epoll_event events[MAX_EVENT_NUMBER];
        int epoll_fd = epoll_create(5);  //事件表大小为5
        if(epoll_fd == -1)
        {
            printf("fail to create epoll!n");
            return -1;
        }
    
        AddFd(epoll_fd, listen_fd, true); //使用ET模式epoll,将listen文件描述符加入事件表
    
        while(1)
        {
            int ret = epoll_wait(epoll_fd, events, MAX_EVENT_NUMBER, -1);
            if(ret < 0)
            {
                printf("epoll failure!n");
                break;
            }
    
            if(ENABLE_ET)
            {
                et_process(events, ret, epoll_fd, listen_fd);
            }
            else
            {
                lt_process(events, ret, epoll_fd, listen_fd);  
            }
    
        }
    
        close(listen_fd);
        return 0;
    
    }
    

    然后再写一个简单的TCP客户端来测试一下:

    //客户端
    #include <sys/types.h> 
    #include <sys/socket.h> 
    #include <stdio.h> 
    #include <netinet/in.h> 
    #include <arpa/inet.h> 
    #include <unistd.h> 
    #include <stdlib.h>
    #include <sys/time.h>
    
    int main() 
    { 
        int client_sockfd; 
        int len; 
        struct sockaddr_in address;//服务器端网络地址结构体 
         int result; 
        char str1[] = "ABCDE"; 
        char str2[] = "ABCDEFGHIJK"; 
        client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket 
        address.sin_family = AF_INET; 
        address.sin_addr.s_addr = inet_addr("127.0.0.1");
        address.sin_port = htons(8888); 
        len = sizeof(address); 
        result = connect(client_sockfd, (struct sockaddr *)&address, len); 
        if(result == -1) 
        { 
             perror("oops: client2"); 
             exit(1); 
        } 
        //第一次读写
        write(client_sockfd, str1, sizeof(str1)); 
    
        sleep(5);
    
        //第二次读写
        write(client_sockfd, str2, sizeof(str2)); 
    
    
        close(client_sockfd); 
    
        return 0; 
    }
    

    TCP客户端的动作是这样的:第一次先发送字符串"ABCDE"过去服务器端,5秒后,再发字符串"ABCDEFGHIJK"过去服务端,我们观察一下ET模式的服务器和LT模式的服务器在读取数据的方式上到底有什么区别。

    ET模式

    图片 2

    ET模式现象分析:我们的服务器读缓冲区大小我们设置了10。第一次接受字符串时,我们的缓冲区有足够的空间接受它,所以打印出内容"ABCDE"并且打印出"read later"表示数据已经读完了。第二次接收字符串时,我们的缓冲区空间不足以接收所有的字符,所以分了两次接收。但是总触发次数仅为2次。

    LT模式

    图片 3

    LT模式现象分析:
    同理,第一次接受字符串有足够的空间接受,第二次接收字符串缓冲区空间不足,所以第二次接收时分了两次来接受。同时也注意到,只要你没有完全接收完上次的数据,内核就会继续通知你去接收数据!所以事件触发的次数是3次。

    epoll的误区

    epoll的误区

    EPOLLONESHOT事件

    即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次,这在并发程序中就会引发一些问题。比如一个县城在读取完某个socket上的数据后开始处理这些数据,而在数据的出来过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另一个县城被唤醒来读取这些新数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们所期望的,我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用EPOLLONESHOT事件实现。

    对于注册了EPOLLONSHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

    下面是一个使用了EPOLLONESHOT的epoll服务器

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    #include <stdbool.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 10
    
    struct fds
    {
        int epollfd;
        int sockfd;
    };
    
    int SetNonblocking(int fd)
    {
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
    }
    
    void AddFd(int epollfd, int fd, bool oneshot)
    {
        struct epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET;
        if(oneshot)
        {
            event.events |= EPOLLONESHOT;
        }
    
        epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
        SetNonblocking(fd);
    }
    
    /*重置fd上的事件,这操作以后,尽管fd上的EPOLLONESHOT事件被注册,但是操作系统仍然会触发fd上的EPOLLIN事件,且只触发一次*/
    void reset_oneshot(int epollfd, int fd)
    {
        struct epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
    }
    
    /*工作线程*/
    void* worker(void* arg)
    {
        int sockfd = ((struct fds*)arg)->sockfd;
        int epollfd = ((struct fds*)arg)->epollfd;
        printf("start new thread to receive data on fd: %dn", sockfd);
        char buf[BUFFER_SIZE];
        memset(buf, 0, BUFFER_SIZE);
    
        while(1)
        {
            int ret = recv(sockfd, buf,BUFFER_SIZE-1, 0);
            if(ret == 0)
            {
                close(sockfd);
                printf("foreigner closed the connectionn");
                break;
            }
            else if(ret < 0)
            {
                if(errno = EAGAIN)
                {
                    reset_oneshot(epollfd, sockfd);
                    printf("read latern");
                    break;
                }
            }
            else
            {
                printf("get content: %sn", buf);
                //休眠5秒,模拟数据处理过程
                printf("worker working...n");
                sleep(5);
            }
        }
        printf("end thread receiving data on fd: %dn", sockfd);
    }
    
    int main(int argc, char* argv[])
    {
        if(argc <= 2)
        {
            printf("usage: ip_address   port_numbern");
            return -1;
        }
    
        const char* ip = argv[1];
        int port = atoi(argv[2]);
    
        int ret = -1;
        struct sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &address.sin_addr);
        address.sin_port = htons(port);
    
        int listenfd = socket(PF_INET, SOCK_STREAM, 0);
        if(listenfd < 0)
        {
            printf("fail to create socket!n");
            return -1;
        }
    
        ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
        if(ret == -1)
        {
            printf("fail to bind socket!n");
            return -1;
        }
    
        ret = listen(listenfd, 5);
        if(ret == -1)
        {
            printf("fail to listen socketn");
            return -1;
        }
    
        struct epoll_event events[MAX_EVENT_NUMBER];
        int epollfd = epoll_create(5);
        if(epollfd == -1)
        {
            printf("fail to create epolln");
            return -1;
        }
    
        //注意,监听socket listenfd上是不能注册EPOLLONESHOT事件的,否则应用程序只能处理一个客户连接!因为后续的客户连接请求将不再触发listenfd的EPOLLIN事件
        AddFd(epollfd, listenfd, false);
    
    
        while(1)
        {
            int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);  //永久等待
            if(ret < 0)
            {
                printf("epoll failure!n");
                break;
            }
    
            int i;
            for(i = 0; i < ret; i  )
            {
                int sockfd = events[i].data.fd;
                if(sockfd == listenfd)
                {
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof(client_address);
                    int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                    //对每个非监听文件描述符都注册EPOLLONESHOT事件
                    AddFd(epollfd, connfd, true);
                }
                else if(events[i].events & EPOLLIN)
                {
                    pthread_t thread;
                    struct fds fds_for_new_worker;
                    fds_for_new_worker.epollfd = epollfd;
                    fds_for_new_worker.sockfd = events[i].data.fd;
                    /*新启动一个工作线程为sockfd服务*/
                    pthread_create(&thread, NULL, worker, &fds_for_new_worker);
    
                }
                else
                {
                    printf("something unexpected happened!n");
                }
            }
        }
    
        close(listenfd);
    
        return 0;
    }
    

    图片 4

    EPOLLONESHOT模式现象分析:我们继续使用上面的TCP客户端来测试,需要修改一下客户端的sleep时间改为3秒。工作流程就是:客户端第一次发送数据时服务器的接收缓冲区是有足够空间的,然后服务器的工作线程进入5秒的处理数据阶段;3秒后客户端继续发送新数据过来,但是工作线程还在处理数据,没办法立即接收新的数据。2秒后,客户端该线程数据处理完了,开始接收新的数据。可以观察到,我们客户端只使用了同一个线程去处理同一个客户端的请求,符合预期。

    1.  epoll ET模式只支持非阻塞句柄?

    其实也支持阻塞句柄,只不过根据应用的使用场景,一般只适合非阻塞使用,参见上文“EPOLL ET与LT的实际应用”

     2.  epoll的共享内存?

    epoll相对于select高效是因为从内核拷贝就绪文件描述符的时候用了共享内存? 这是不对的,实现的时候只是用了使用了copy_from_user跟__put_user进行内核跟用户虚拟空间数据交互,并没有共享内存的api。

    1.  epoll ET模式只支持非阻塞句柄?

    其实也支持阻塞句柄,只不过根据应用的使用场景,一般只适合非阻塞使用,参见上文“EPOLL ET与LT的实际应用”

     2.  epoll的共享内存?

    epoll相对于select高效是因为从内核拷贝就绪文件描述符的时候用了共享内存? 这是不对的,实现的时候只是用了使用了copy_from_user跟__put_user进行内核跟用户虚拟空间数据交互,并没有共享内存的api。

    问题集锦

    问题集锦

    epoll需要再次op->poll的原因

    因为等待队列中的有事件后会唤醒所有的进程,可能有的进程位于对头把事件消费后就直接删除了这个事件,后面的进程唤醒后可能再没有事件消费了,所以需要再次判断poll,如果事件还在则加入rdlist中。当然消费完事件后不一定会删除,等待队列中可以通过flag选项设置消费的方式。

     

    epoll每次都将txlist中的LT事件不等用户消费就直接返回给rdlist,那么在用户消费了该事件后,导致事件不就绪,再次调用epoll_wait,epoll_wait还会返回rdlist吗?

    不会再次返回,因为在返回就绪列表之前会还调用一次revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) 来判断事件,如果事件发生了变化,就不在返回。

     

    内核的等待队列:

    内核为了支持对设备的阻塞访问,就需要设计一个等待队列,等待队列中是一个个进程,当设备事件就绪后会唤醒等待队列中的进程来消费事件。但是在使用select监听非阻塞的句柄时候,这个队列不是用来实现非阻塞,而是实现状态的等待,即等待某个可读可写事件发生后通知监听的进程

     

    内核的 poll技术就是为了poll/select设计的?

    每个设备的驱动为了支持操作系统虚拟文件系统对其的使用需要提供一系列函数,比如说read,write等,其中poll就是其中一个函数,为了select,poll实现,用来查询设备是否可读或可写,或是否处于某种特殊状态。

     

    eventPoll的两个队列

    evnetpoll中有两个等待队列,

    wait_queue_head_t wq;

    wait_queue_head_t poll_wait;

    前者用于调用epoll_wait()时, 我们就是"睡"在了这个等待队列上...

    后者用于这个用于epollfd本事被poll的时候... 也就是说epollfd被其他epoll监视,调用其file->poll() 时。

    对于本epoll监视的句柄有消息的时候会向wq消息队列进行wakeup,同时对于poll_wait也会进行wakeup

     

    eventPollfs实现的file opetion

    只实现了poll和realse,由于epoll自身也是文件系统,其描述符也可以被poll/select/epoll监视,因此需要实现poll方法,具体就是ep_eventpoll_poll方法,他内部实现是将监听当前epollfd的线程插入到自己的poll_wait队列中,判断自己接听的句柄是否有事件发生,如果有的话需要将信息返回给监听epollfd的epoll_wait, 具体方法是然后扫描就绪的文件列表, 调用每个文件上的poll 检测是否真的就绪, 然后复制到用户空间,但是文件列表中有可能有epoll文件, 调用poll的时候有可能会产生递归,  所以用ep_call_nested 包装一下, 防止死循环和过深的调用。具体参见问题递归深度检测(ep_call_nested)

     

    epoll的线程安全问题

    当一个线程阻塞在epoll_wait()上的时候,其他线程向其中添加新的文件描述符是没问题的,如果这个文件描述符就绪的话,阻塞线程的epoll_wait()会被唤醒。但是如果正在监听的某文件描述符被其他线程关闭的话详表现是未定义的。在有些 UNIX系统下,select会解除阻塞返回,而文件描述符会被认为就绪,然而对这个文件描述符进行IO操作会失败(除非这个文件描述符又被分配了),在Linux下,另一个线程关闭文件描述符没有任何影响。但不管怎样,应当尽量壁面一个线程关闭另一个线程在监听的文件描述符。

     

    递归深度检测(ep_call_nested)

    epoll本身也是文件,也可以被poll/select/epoll监视,如果epoll之间互相监视就有可能导致死循环。epoll的实现中,所有可能产生递归调用的函数都由函数ep_call_nested进行包裹,递归调用过程中出现死循环或递归过深就会打破死循环和递归调用直接返回。该函数的实现依赖于一个外部的全局链表nested_call_node(不同的函数调用使用不同的节点),每次调用可能发生递归的函数(nproc)就向链表中添加一个包含当前函数调用上下文ctx(进程,CPU,或epoll文件)和处理的对象标识cookie的节点,通过检测是否有相同的节点就可以知道是否发生了死循环,检查链表中同一上下文包含的节点个数就可以知道递归的深度。参见参考2。

     

    为什么需要创建一个文件系统:

    一是可以在内核维护一些信息,这些信息在多次epoll_wait之间是保持的(保存的是eventpoll结构)第二点是epoll本身也可以被poll/epoll

     

    两个回调函数

    Epoll向等待队列有两个函数交互,分别是调用对应设备的poll函数,在poll函数中调用ep_ptable_queue_proc函数,将当前进程插入到等待队列,指定ep_poll_callback为唤醒时的回调函数。Ep_poll_callback实现将当前的句柄复制到rdlist并wakeup,eventpoll的wq等待队列。

    epoll需要再次op->poll的原因

    因为等待队列中的有事件后会唤醒所有的进程,可能有的进程位于对头把事件消费后就直接删除了这个事件,后面的进程唤醒后可能再没有事件消费了,所以需要再次判断poll,如果事件还在则加入rdlist中。当然消费完事件后不一定会删除,等待队列中可以通过flag选项设置消费的方式。

     

    epoll每次都将txlist中的LT事件不等用户消费就直接返回给rdlist,那么在用户消费了该事件后,导致事件不就绪,再次调用epoll_wait,epoll_wait还会返回rdlist吗?

    不会再次返回,因为在返回就绪列表之前会还调用一次revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) 来判断事件,如果事件发生了变化,就不在返回。

     

    内核的等待队列:

    内核为了支持对设备的阻塞访问,就需要设计一个等待队列,等待队列中是一个个进程,当设备事件就绪后会唤醒等待队列中的进程来消费事件。但是在使用select监听非阻塞的句柄时候,这个队列不是用来实现非阻塞,而是实现状态的等待,即等待某个可读可写事件发生后通知监听的进程

     

    内核的 poll技术就是为了poll/select设计的?

    每个设备的驱动为了支持操作系统虚拟文件系统对其的使用需要提供一系列函数,比如说read,write等,其中poll就是其中一个函数,为了select,poll实现,用来查询设备是否可读或可写,或是否处于某种特殊状态。

     

    eventPoll的两个队列

    evnetpoll中有两个等待队列,

    wait_queue_head_t wq;

    wait_queue_head_t poll_wait;

    前者用于调用epoll_wait()时, 我们就是"睡"在了这个等待队列上...

    后者用于这个用于epollfd本事被poll的时候... 也就是说epollfd被其他epoll监视,调用其file->poll() 时。

    对于本epoll监视的句柄有消息的时候会向wq消息队列进行wakeup,同时对于poll_wait也会进行wakeup

     

    eventPollfs实现的file opetion

    只实现了poll和realse,由于epoll自身也是文件系统,其描述符也可以被poll/select/epoll监视,因此需要实现poll方法,具体就是ep_eventpoll_poll方法,他内部实现是将监听当前epollfd的线程插入到自己的poll_wait队列中,判断自己接听的句柄是否有事件发生,如果有的话需要将信息返回给监听epollfd的epoll_wait, 具体方法是然后扫描就绪的文件列表, 调用每个文件上的poll 检测是否真的就绪, 然后复制到用户空间,但是文件列表中有可能有epoll文件, 调用poll的时候有可能会产生递归,  所以用ep_call_nested 包装一下, 防止死循环和过深的调用。具体参见问题递归深度检测(ep_call_nested)

     

    epoll的线程安全问题

    当一个线程阻塞在epoll_wait()上的时候,其他线程向其中添加新的文件描述符是没问题的,如果这个文件描述符就绪的话,阻塞线程的epoll_wait()会被唤醒。但是如果正在监听的某文件描述符被其他线程关闭的话详表现是未定义的。在有些 UNIX系统下,select会解除阻塞返回,而文件描述符会被认为就绪,然而对这个文件描述符进行IO操作会失败(除非这个文件描述符又被分配了),在Linux下,另一个线程关闭文件描述符没有任何影响。但不管怎样,应当尽量壁面一个线程关闭另一个线程在监听的文件描述符。

     

    递归深度检测(ep_call_nested)

    epoll本身也是文件,也可以被poll/select/epoll监视,如果epoll之间互相监视就有可能导致死循环。epoll的实现中,所有可能产生递归调用的函数都由函数ep_call_nested进行包裹,递归调用过程中出现死循环或递归过深就会打破死循环和递归调用直接返回。该函数的实现依赖于一个外部的全局链表nested_call_node(不同的函数调用使用不同的节点),每次调用可能发生递归的函数(nproc)就向链表中添加一个包含当前函数调用上下文ctx(进程,CPU,或epoll文件)和处理的对象标识cookie的节点,通过检测是否有相同的节点就可以知道是否发生了死循环,检查链表中同一上下文包含的节点个数就可以知道递归的深度。参见参考2。

     

    为什么需要创建一个文件系统:

    一是可以在内核维护一些信息,这些信息在多次epoll_wait之间是保持的(保存的是eventpoll结构)第二点是epoll本身也可以被poll/epoll

     

    两个回调函数

    Epoll向等待队列有两个函数交互,分别是调用对应设备的poll函数,在poll函数中调用ep_ptable_queue_proc函数,将当前进程插入到等待队列,指定ep_poll_callback为唤醒时的回调函数。Ep_poll_callback实现将当前的句柄复制到rdlist并wakeup,eventpoll的wq等待队列。

    参考文献:

     

      

    讲解了内核阻塞与非阻塞和poll机制, 并分析了select的实现方式

    讲解poll的实现,也为下篇博客做铺垫

    Epoll的实现,是我的入门博客

    很全面,很系统,讲解了poll机制和select/poll/epoll实现

    讲解poll机制,相关性不大,但是对内核的等待队列理解有帮助

    牛客网注释

    应用封神之系列!哈哈

    2018-6-3 append: 

    select / poll / epoll: practical difference for system architects :   

     

    参考文献:

     

    讲解了内核阻塞与非阻塞和poll机制, 并分析了select的实现方式

    讲解poll的实现,也为下篇博客做铺垫

    Epoll的实现,是我的入门博客

    很全面,很系统,讲解了poll机制和select/poll/epoll实现

    讲解poll机制,相关性不大,但是对内核的等待队列理解有帮助

    牛客网注释

    应用封神之系列!哈哈

     

    本文由新葡亰496net发布于电脑系统,转载请注明出处:网络编程,从实现到应用

    关键词:

上一篇:相识相知发行版,linux系统初识

下一篇:没有了