jason--liu/Blog

Linux epoll原理及内核实现

Opened this issue · 0 comments

epoll技术简介

epoll是一种典型的I/O多路复用技术,epoll技术的最大特点是支持高并发。传统多路复用技术select,poll当并发量达到一两千的时候性能就会下降,而epoll能支持上万的并发连接,并且依然能快速稳定的响应。当然,并发连接每增加一个,必定要消耗一定的内存去保存这个连接相关的数据。

epoll_create()函数

int epoll_create(int size)

功能:创建一个epoll对象,返回该对象的描述符(文件描述符),这个描述符就代表这个epoll对象,后续会用到。从内核代码可以看出来,这个size貌似只需要大于0即可。

SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size <= 0)
		return -EINVAL;

	return sys_epoll_create1(0);
}

再看看epoll_create1的实现

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    //省略部分代码
    struct eventpoll *ep = NULL;
    ...
    error = ep_alloc(&ep);
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
	if (fd < 0) {
		error = fd;
		goto out_free_ep;
	}
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	if (IS_ERR(file)) {
		error = PTR_ERR(file);
		goto out_free_fd;
	}
	fd_install(fd, file);
	ep->file = file;
	return fd;
}

首先分配一个eventpoll结构并初始化,这个结构有两个非常重要的数据结构struct list_head rdlliststruct rb_root rbr
rdllist指向一个双向链表,用来指向已经准备好(有数据)的文件描述符。
rbr指向一棵红黑树的根,
像下面这个图这样,
image
接着,找到一个没有被使用的文件描述符fd,并设置file结构,然后再将fdfile绑定起来。
总结:epoll_create()函数主要是创建一个eventpoll的结构,并把fd返回给用户空间。

epoll_ctl()函数

int epoll_ctl(int efpd,int op,int sockid,struct epoll_event *event);

功能:把一个socket以及这个socket相关的事件添加到这个epoll对象描述符中去,目的就是通过这个epoll对象来监视这个socke上数据的来往情况,当有数据来往时,系统会通知我们。
op:动作,添加/删除/修改 ,对应数字是1,2,3, 对应宏是EPOLL_CTL_ADD, EPOLL_CTL_DEL ,EPOLL_CTL_MOD。后面我们会分析每个动作内核具体做了什么。
sockid:表示某个文件句柄,这个是红黑树里边的key。
event:事件信息,EPOLL_CTL_ADD和EPOLL_CTL_MOD都要用到这个event参数里边的事件信息;
下面来看看当用户调用'epoll_ctl'函数时,内核做了哪些事情。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
   ...
   /* Get the "struct file *" for the eventpoll file */
    file = fget(epfd);
  //监听的目标句柄的file_operatons结构中必须实现了poll函数
    tfile = fget(fd);
    if (!tfile->f_op || !tfile->f_op->poll)
	goto error_tgt_fput;
   // 如果将epoll自己的句柄加入进来会直接返回EINVAL错误
   if (file == tfile || !is_file_epoll(file))
		goto error_tgt_fput;
   //  下面会判断是不是会进入一个死循环,这里没看明白,略过...
   if (op == EPOLL_CTL_ADD) {
		if (is_file_epoll(tfile)) {
			error = -ELOOP;
			if (ep_loop_check(ep, tfile) != 0)
				goto error_tgt_fput;
		} else
			list_add(&tfile->f_tfile_llink, &tfile_check_list);
	}
   // 接下来根据不同的op来操作,下面分开讲解
   ....
   return error;
}

EPOLL_CTL_ADD、EPOLL_CTL_DEL、EPOLL_CTL_MOD实现

// 通过传入的fd在红黑树中找到epitem结构
// 这个结构主要指向已经准备好(有数据)的rbn红黑树和rdllink链表
epi = ep_find(ep, tfile, fd);

image
epitem结构设计的高明之处:既能够作为红黑树中的节点,又能够作为双向链表中的节点。
红黑树这种数据结构可以简单的理解为通过key来找value,且它的查找速度特别快,内核里面很多地方都用到了这种数据结构。下面看看如何在红黑树中查找epitem结构

static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd)
{
for (rbp = ep->rbr.rb_node; rbp; ) {
    epi = rb_entry(rbp, struct epitem, rbn);
	kcmp = ep_cmp_ffd(&ffd, &epi->ffd);
	    if (kcmp > 0)
		rbp = rbp->rb_right;
	    else if (kcmp < 0)
		rbp = rbp->rb_left;
	    else {
		epir = epi;
		break;
		}
	}
}
/* Compare RB tree keys */
static inline int ep_cmp_ffd(struct epoll_filefd *p1,
			     struct epoll_filefd *p2)
{
	return (p1->file > p2->file ? +1:
	        (p1->file < p2->file ? -1 : p1->fd - p2->fd));
}

下面看具体的增删改操作

case EPOLL_CTL_ADD:
    if (!epi) {
        epds.events |= POLLERR | POLLHUP;
        // 如果没有找到eip就将fd插入红黑树
	    error = ep_insert(ep, &epds, tfile, fd);
    else
        // 否则当前节点已经存在,返回错误
        error = -EEXIST;

ep_insert函数首先会判断epoll是否超过了系统监听的最多数目max_user_watches。
这个值可以通过cat /proc/sys/fs/epoll/max_user_watches获取,再我的系统上是1646305,当然这个值是可以改的,后面有机会再说。

int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
// 判断是否超过系统能监听的最大值
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
	return -ENOSPC;
接下来是初始化`epitem`结构/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 下面很重要,也很复制,还没怎么看明白,以后在分析。。。
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    case EPOLL_CTL_DEL:
    // 如果是删除操作,直接从红黑树中删除节点
    error = ep_remove(ep, epi);
    case EPOLL_CTL_MOD:
	if (epi) {
	    epds.events |= POLLERR | POLLHUP;
            // 如果是MOD,就相关相关节点的event数据
	    error = ep_modify(ep, epi, &epds);

总结一下:
EPOLL_CTL_ADD:等价于往红黑树中增加节点
EPOLL_CTL_DEL:等价于从红黑树中删除节点
EPOLL_CTL_MOD:等价于修改已有的红黑树的节点

epoll_wait()函数

那么当事件发生时,我们如何拿到系统的通知呢?这就需要epoll_wait函数了。

int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);

功能:阻塞一小段时间并等待事件发生,返回事件集合,也就是获取内核的事件通知。
epfd:是epoll_create()返回的epoll对象描述符;
参数events:是内存,也是数组,长度 是maxevents,表示此次epoll_wait调用可以收集到的maxevents个已经准备好的读写事件;

// 首先仍然是一些判断
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
		return -EINVAL;

/* Verify that the area passed by the user is writeable */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
		error = -EFAULT;
		goto error_return;
	}
// 然后进入最关键的ep_poll
/* Time to fish for events ... */
	error = ep_poll(ep, events, maxevents, timeout);

而ep_poll最关键的是遍历链表

if (ep_events_available(ep) || timed_out)
..
static inline int ep_events_available(struct eventpoll *ep)
{
    // 查看链表中是否有可以读的事件,有就返回
    return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
}

epoll_wait主要是遍历这个双向链表,把这个双向链表里边的节点数据拷贝出去,拷贝完毕的就从双向链表里移除。

参考资料

https://www.cnblogs.com/sduzh/p/6714281.html
https://idndx.com/2014/09/01/the-implementation-of-epoll-1/
https://idndx.com/2014/09/02/the-implementation-of-epoll-2/