이 글에서는 먼저 epoll 모델의 구현 원리를 설명한 다음 nginx가 epoll 모델을 기반으로 이벤트 중심 모델을 구현하는 방법을 소스 코드 수준에서 설명합니다.
epoll은 이벤트 중심 모델이며, 이는 nginx가 클라이언트 요청을 효율적으로 처리할 수 있는 중요한 이유 중 하나입니다. 프로세스 관점에서 epoll 모델의 사용은 주로 epoll 핸들 생성, 수신 파일 설명자 추가 및 대기 이벤트 트리거의 세 단계로 나뉩니다. 이 기사에서는 nginx가 클라이언트 요청의 효율적인 처리를 구현하는 방법을 소개합니다. 이 세 단계를 기반으로 합니다.
관련 추천: "Nginx Tutorial"
1. epoll 모델 소개
nginx의 구현 원리를 소개하기 전에 먼저 epoll 모델의 기본 사용법을 소개해야 합니다. epoll을 사용하는 세 가지 주요 방법이 있습니다:
// 创建epoll句柄 int epoll_create(int size); // 往epoll句柄中添加需要进行监听的文件描述符 int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event); // 等待需要监听的文件描述符上对应的事件的发生 int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);
먼저 epoll_create() 메서드를 호출하여 epoll 인스턴스에 대한 핸들을 생성합니다. 여기서 핸들은 eventpoll 구조 인스턴스로 이해될 수 있으며 여기에 빨간색이 있습니다. 블랙 트리와 큐 구조 레드-블랙 트리는 주로 모니터링해야 할 파일 디스크립터를 저장하며, 아래 그림과 같이 큐는 모니터링되는 파일 디스크립터에서 지정된 이벤트가 발생할 때 이러한 이벤트를 큐에 추가합니다. eventpoll의 도식 다이어그램:
일반적으로 프로그램의 전체 실행 주기 동안 하나의 epoll 핸들만 있습니다. 예를 들어 nginx의 각 작업자 프로세스는 하나의 epoll 핸들만 유지합니다. 핸들을 생성한 후 프로그램이 수신하는 각 포트는 본질적으로 파일 설명자입니다. 이 파일 설명자에서 Accept 이벤트가 발생할 수 있습니다. 즉, 클라이언트 요청이 수신됩니다.
따라서 처음에는 epoll_ctl() 메소드를 통해 모니터링해야 하는 포트에 해당하는 파일 디스크립터를 epoll 핸들에 추가하겠습니다. 추가가 성공한 후 각 수신 파일 설명자는 eventpoll의 레드-블랙 트리에 있는 노드에 해당합니다.
또한 epoll_ctl() 메서드를 호출하여 파일 설명자를 추가한 후 해당 장치(네트워크 카드)와 연결됩니다. 장치 드라이버에서 이벤트가 발생하면 현재 파일 설명자의 콜백 메서드 ep_poll_callback이 ()를 호출하여 이벤트를 생성하고 해당 이벤트를 eventpoll의 이벤트 큐에 추가합니다.
마지막으로 epoll_wait() 메서드를 호출하면 epoll 핸들에서 해당 이벤트를 가져옵니다. 기본적으로 eventpoll의 이벤트 큐가 비어 있는지 확인하고, 그렇지 않으면 기다립니다. 이벤트 발생을 위해.
또한 epoll을 사용하기 위해 여기서 얻은 이벤트는 일반적으로 Accept 이벤트입니다. 이 이벤트를 처리할 때 이 핸들은 본질적으로 파일 설명자입니다. epoll_wait() 메서드를 통해 해당 데이터의 읽기 및 쓰기 이벤트를 계속 대기하기 위해 epoll_ctl() 메서드를 통해 현재 epoll 핸들에 계속 추가됩니다.
여기에서 epoll을 사용하는 동안 두 가지 유형의 파일 설명자가 있음을 알 수 있습니다. 한 가지 유형은 우리가 모니터링하는 포트에 해당하는 파일 설명자입니다. 일반적으로 이러한 유형의 설명자에 대한 Accept 이벤트를 모니터링합니다. 클라이언트 연결을 기다립니다. 다른 유형은 각 클라이언트 연결에 해당하는 파일 설명자입니다. 여기서는 일반적으로 읽기 및 쓰기 이벤트를 수신하여 클라이언트에 데이터를 보내고 보냅니다.
2. nginx에서 epoll을 구현하는 방법
이전 기사에서 nginx가 이벤트 기반 프레임워크를 초기화하는 방법을 설명했습니다. 이벤트 프레임워크의 핵심 모듈 중 하나는 다음과 같습니다.
ngx_module_t ngx_event_core_module = { NGX_MODULE_V1, &ngx_event_core_module_ctx, /* module context */ ngx_event_core_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init master */ // 该方法主要是在master进程启动的过程中调用的,用于初始化时间模块 ngx_event_module_init, /* init module */ // 该方法是在各个worker进程启动之后调用的 ngx_event_process_init, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };
여기에 필요합니다. 앞서 언급한 대로 ngx_event_process_init() 메서드는 각 작업자가 생성될 때 초기화되고 호출됩니다. 여기에는 두 가지 매우 중요한 호출이 포함됩니다. a. 지정된 각 포트에서 수신합니다. 구성 파일에서. 다음은 이 두 단계의 주요 코드입니다.
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { // 省略部分代码.... // 在nginx.conf配置文件的events{}配置块中需要使用use指令指定当前使用的事件模型, // 此时就会将所使用的事件模型的索引号存储在ecf->use中,下面的代码就是通过这种方式获取当前 // 所指定的事件模型所对应的模块的,然后调用该模块的actions.init()方法初始化该事件模型 for (m = 0; cycle->modules[m]; m++) { if (cycle->modules[m]->type != NGX_EVENT_MODULE) { continue; } // ecf->use存储了所选用的事件模型的模块序号,这里是找到该模块 if (cycle->modules[m]->ctx_index != ecf->use) { continue; } // module即为所选用的事件模型对应的模块 module = cycle->modules[m]->ctx; // 调用指定事件模型的初始化方法 if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { exit(2); } break; } // 省略部分代码... ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { #if (NGX_HAVE_REUSEPORT) if (ls[i].reuseport && ls[i].worker != ngx_worker) { continue; } #endif // 这里是为当前所监听的每一个端口都绑定一个ngx_connection_t结构体 c = ngx_get_connection(ls[i].fd, cycle->log); if (c == NULL) { return NGX_ERROR; } rev = c->read; // SOCK_STREAM表示TCP,一般都是TCP,也就是说在接收到客户端的accept事件之后, // 就会调用ngx_event_accept()方法处理该事件 rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ccf->worker_processes > 1) { if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT) == NGX_ERROR) { return NGX_ERROR; } continue; } } return NGX_OK; }
여기 코드는 주로 다음 부분을 완성합니다.
먼저 사용된 이벤트 모델 모듈을 찾은 다음 해당 init() 메서드를 호출하여 모델을 초기화합니다. 여기서 수행되는 두 가지 주요 작업은 epoll_create() 메서드를 통해 epoll 핸들을 생성하는 것입니다. 이는 현재 작업자 프로세스를 실행하기 위한 기초입니다. 다른 하나는 전역 변수 ngx_event_actions에 값을 할당하는 것입니다. 이 할당 호출은 매우 중요합니다. 할당 후 nginx에서 정의한 여러 메소드 매크로는 모두 사용된 epoll 모듈에 지정된 메소드입니다.
// 这里将epoll相关的事件操作方法赋值给ngx_event_actions, // 也就是说后续有相关的事件发生则都会使用epoll相关的方法 ngx_event_actions = ngx_epoll_module_ctx.actions;
그리고 여기에 ngx_epoll_module_ctx.actions 구조가 있습니다. :
#define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn
이에서 우리가 선택한 이벤트 모델을 통해 ngx_add_event()와 같은 매크로에 대해 구현된 하위 모듈을 동적으로 지정할 수 있다는 것을 알 수 있습니다.
上面的方法完成的第二个主要的工作就是遍历所有监听的端口,获取其描述符,然后通过ngx_add_event()方法将其添加到epoll句柄中以监听其客户端连接事件。从这里就可以感觉到比较巧妙了,因为上面一步中正好对epoll模块进行了初始化,并且设置了ngx_add_event()宏的实现方法,而这里就使用到了这里设置的方法,该方法本质上就是通过epoll_ctl()方法将当前监听的socket描述符添加到epoll句柄中;
最后就是上面的方法在遍历所有监听的端口的时候,为每个连接的accept事件添加的回调方法是ngx_event_accept(),通过前面我们对epoll模型的使用方式的介绍,我们大概可以理解,这里的ngx_event_accept()方法的主要作用是将当前accept到的客户端连接的句柄通过epoll_ctl()方法添加到当前epoll句柄中,以继续监听其读写事件;
这里我们首先看一下上面第一点中介绍的module->actions.init(cycle, ngx_timer_resolution)方法调用时是如何初始化epoll模块的。由于是epoll模块,这里的init()方法指向的就是ngx_epoll_init()方法,如下是该方法的源码:
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; // 获取解析得到的ngx_epoll_conf_t结构体 epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (ep == -1) { // 创建eventpoll结构体,将创建得到的文件描述符返回 ep = epoll_create(cycle->connection_n / 2); // ep==-1表示创建失败 if (ep == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } } // 如果nevents小于epcf->events,说明event_list数组的长度不够,因而需要重新申请内存空间 if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } // 为event_list重新申请内存空间 event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list == NULL) { return NGX_ERROR; } } // 将nevents更新为配置文件中指定的大小 nevents = epcf->events; ngx_io = ngx_os_io; // 这里将epoll相关的事件操作方法赋值给ngx_event_actions,也就是说后续有相关的事件发生则 // 都会使用epoll相关的方法 ngx_event_actions = ngx_epoll_module_ctx.actions; // 这里NGX_USE_CLEAR_EVENT指的是使用ET模式来使用epoll,默认使用ET模式, // 而NGX_USE_LEVEL_EVENT表示使用LE模式来使用epoll #if (NGX_HAVE_CLEAR_EVENT) ngx_event_flags = NGX_USE_CLEAR_EVENT #else ngx_event_flags = NGX_USE_LEVEL_EVENT #endif // NGX_USE_GREEDY_EVENT表示每次拉取事件是都尝试拉取最多的事件 | NGX_USE_GREEDY_EVENT | NGX_USE_EPOLL_EVENT; return NGX_OK; }
可以看到,这里的ngx_epoll_init()方法主要的作用有两个:a. 通过epoll_create()方法创建一个epoll句柄;b. 设置ngx_event_actions属性所指向的方法的实现,从而确定ngx_add_event()等宏的实现方法。下面我们来看一下ngx_add_event()是如何将需要监听的文件描述符添加到epoll句柄中的:
static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; // ev->data在使用的过程中存储的是当前对应的ngx_connection_t,如果是free_connection, // 则存储的是下一个节点的指针 c = ev->data; // 事件类型 events = (uint32_t) event; // 如果是读事件 if (event == NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; #if (NGX_READ_EVENT != EPOLLIN | EPOLLRDHUP) events = EPOLLIN | EPOLLRDHUP; // 设置读事件类型 #endif } else { e = c->read; prev = EPOLLIN | EPOLLRDHUP; #if (NGX_WRITE_EVENT != EPOLLOUT) events = EPOLLOUT; // 设置写事件类型 #endif } // 根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件 if (e->active) { op = EPOLL_CTL_MOD; // 类型为修改事件 events |= prev; } else { op = EPOLL_CTL_ADD; // 类型为添加事件 } #if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP) if (flags & NGX_EXCLUSIVE_EVENT) { events &= ~EPOLLRDHUP; } #endif // 将flags参数指定的事件添加到监听列表中 ee.events = events | (uint32_t) flags; // 这里是将connection指针的最后一位赋值为ev->instance,然后将其赋值给事件的ptr属性,通过这种方式检测事件是否过期 ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); // 将事件添加到epoll句柄中 if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } // 将事件标记为活跃状态 ev->active = 1; #if 0 ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; #endif return NGX_OK; }
这里的ngx_add_event()方法本质上是比较简单的,就是将当前的ngx_event_t转换为一个epoll_event结构体,并且会设置该结构体中需要监听的事件类型,然后通过epoll_ctl()方法将当前epoll_event添加到epoll句柄中。
在前面的ngx_event_process_init()方法中,nginx通过ngx_add_event()方法将各个监听的端口的描述符添加到epoll句柄中之后,就会开始监听这些描述符上的accept连接事件,如果有客户端连接请求,此时就会回调ngx_event_accept()方法处理该请求,我们来看一下该方法是如何处理客户端建立连接的请求的:
/** * 当客户端有accept事件到达时,将调用此方法处理该事件 */ void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; #if (NGX_HAVE_ACCEPT4) static ngx_uint_t use_accept4 = 1; #endif if (ev->timedout) { // 如果当前事件超时了,则继续将其添加到epoll句柄中以监听accept事件 if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } // 获取解析event核心配置结构体 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; do { socklen = sizeof(ngx_sockaddr_t); #if (NGX_HAVE_ACCEPT4) if (use_accept4) { s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK); } else { s = accept(lc->fd, &sa.sockaddr, &socklen); } #else // 这里lc->fd指向的是监听的文件句柄,调用accept()获取客户端的连接,并且将其存储到sa.sockaddr中 s = accept(lc->fd, &sa.sockaddr, &socklen); #endif // 检查当前进程获取的连接个数是否超过了最大可用连接数的7/8,是则不再继续接收连接 ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; // 获取新的连接 c = ngx_get_connection(s, ev->log); // 获取连接失败则直接返回 if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } // 标记当前为TCP连接 c->type = SOCK_STREAM; // 为当前连接创建连接池 c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } // 更新socklen的长度 if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { socklen = sizeof(ngx_sockaddr_t); } // 为sockaddr申请内存空间,并且将客户端连接地址复制到c->sockaddr中 c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, &sa, socklen); // 申请ngx_log_t结构体的内存空间 log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { // 将连接设置为阻塞模式 if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { // 将连接设置为非阻塞模式 if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; // 设置连接的基本属性 c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } #endif rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; #if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP) rev->available = 1; #endif } rev->log = log; wev->log = log; // 更新连接使用次数 c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); // 将网络地址更新为字符串形式的地址 if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } #if (NGX_DEBUG) { ngx_str_t addr; u_char text[NGX_SOCKADDR_STRLEN]; ngx_debug_accepted_connection(ecf, c); if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, "*%uA accept: %V fd:%d", c->number, &addr, s); } } #endif // 将当前连接添加到epoll句柄中进行监控 if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; // 建立新连接之后的回调方法 ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); }
这里客户端连接的建立过程主要可以分为如下几个步骤:
首先调用accept()方法获取当前客户端建立的连接,并且将其地址信息保存到结构体sa中;
接着通过调用ngx_get_connection()方法获取一个ngx_connection_t结构体以对应当前获取到的客户端连接,并且会初始化该结构体的各个属性;
调用ngx_add_conn()方法将当前方法添加到epoll句柄中,这里的添加过程本质上就是通过epoll_ctl()方法将当前客户端的连接的文件描述符添加到epoll句柄中,以监听其读写事件;
如此我们就讲解了从epoll句柄的创建,到指定的端口的监听,接着处理客户端连接,并且将客户端连接对应的文件描述符继续添加到epoll句柄中以监听读写事件的流程。下面我们继续来看一下nginx是如何等待所监听的这些句柄上的事件的发生的,也即整个事件框架的驱动程序。worker进程对于事件的处理,主要在ngx_process_events_and_timers()方法中,如下是该方法的源码:
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { // 尝试获取共享锁 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } // 这里开始处理事件,对于kqueue模型,其指向的是ngx_kqueue_process_events()方法, // 而对于epoll模型,其指向的是ngx_epoll_process_events()方法 // 这个方法的主要作用是,在对应的事件模型中获取事件列表,然后将事件添加到ngx_posted_accept_events // 队列或者ngx_posted_events队列中 (void) ngx_process_events(cycle, timer, flags); // 这里开始处理accept事件,将其交由ngx_event_accept.c的ngx_event_accept()方法处理; ngx_event_process_posted(cycle, &ngx_posted_accept_events); // 开始释放锁 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } // 如果不需要在事件队列中进行处理,则直接处理该事件 // 对于事件的处理,如果是accept事件,则将其交由ngx_event_accept.c的ngx_event_accept()方法处理; // 如果是读事件,则将其交由ngx_http_request.c的ngx_http_wait_request_handler()方法处理; // 对于处理完成的事件,最后会交由ngx_http_request.c的ngx_http_keepalive_handler()方法处理。 // 这里开始处理除accept事件外的其他事件 ngx_event_process_posted(cycle, &ngx_posted_events); }
这里的ngx_process_events_and_timers()方法我们省略了大部分代码,只留下了主要的流程。简而言之,其主要实现了如下几个步骤的工作:
获取共享锁,以得到获取客户端连接的权限;
调用ngx_process_events()方法监听epoll句柄中各个文件描述符的事件,并且处理这些事件。在前面我们讲到,nginx在调用epoll模块的init()方法时,初始化了ngx_event_actions属性的值,将其指向了epoll模块所实现的方法,这里就包括ngx_process_events方法宏所对应的方法,也即ngx_epoll_process_events()方法,因而这里其实就可以理解,ngx_epoll_process_events()方法本质上就是调用epoll_wait()方法等待epoll句柄上所监听的事件的发生;
处理ngx_posted_accept_events队列中的事件,这些事件其实就是前面讲到的客户端建立连接的事件,在ngx_epoll_process_events()方法中获取到事件之后,会判断其是accept事件还是读写事件,如果是accept事件,就会将其添加到ngx_posted_accept_events队列中,如果是读写事件,就会将其添加到ngx_posted_events队列中;
释放共享锁,以让其他的worker进程可以获取锁,从而接收客户端连接;
处理ngx_posted_events队列中的事件,也即客户端连接的读写事件。从这里就可以看出nginx高性能的一个原因,其将accept事件和读写事件放到了两个不同的队列中,accept事件是必须在锁内部处理的,而读写事件则可以异步于accept事件,这提高了nginx处理客户端请求的能力。
下面我们来看一下ngx_epoll_process_events()方法是如何处理epoll句柄中的事件的:
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); // 通过epoll_wait()方法进行事件的获取,获取到的事件将存放在event_list中,并且会将获取的事件个数返回 events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; // 这里的ngx_event_timer_alarm是通过一个定时器任务来触发的,在定时器中会将其置为1, // 从而实现定期更新nginx缓存的时间的目的 if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } // 获取的事件个数为0 if (events == 0) { // 如果当前时间类型不为NGX_TIMER_INFINITE,说明获取事件超时了,则直接返回 if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } // 这里说明时间类型为NGX_TIMER_INFINITE,但是却返回了0个事件,说明epoll_wait()调用出现了问题 ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } // 遍历各个事件 for (i = 0; i < events; i++) { // 每个事件的data.ptr中存储了当前事件对应的connection对象 c = event_list[i].data.ptr; // 获取事件中存储的instance的值 instance = (uintptr_t) c & 1; // 获取connection指针地址值 c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); // 获取读事件结构体 rev = c->read; // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance, // 说明该连接已经过期了,则不对该事件进行处理 if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } // 获取当前事件监听的类型 revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); // 如果事件发生错误,则打印相应的日志 if (revents & (EPOLLERR | EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); /* * if the error events were returned, add EPOLLIN and EPOLLOUT * to handle the events at least in one active handler */ revents |= EPOLLIN | EPOLLOUT; } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); } #endif // 如果当前是读事件,并且事件是活跃的 if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } rev->available = 1; #endif // 将事件标记为就绪状态 rev->ready = 1; // 默认是开启了NGX_POST_EVENTS开关的 if (flags & NGX_POST_EVENTS) { // 如果当前是accept事件,则将其添加到ngx_posted_accept_events队列中, // 如果是读写事件,则将其添加到ngx_posted_events队列中 queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { // 如果不需要分离accept和读写事件,则直接处理该事件 rev->handler(rev); } } // 获取写事件结构体 wev = c->write; if ((revents & EPOLLOUT) && wev->active) { // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance, // 说明该连接已经过期了,则不对该事件进行处理 if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } // 将当前事件标记为就绪状态 wev->ready = 1; #if (NGX_THREADS) wev->complete = 1; #endif // 由于是写事件,并且需要标记为了NGX_POST_EVENTS状态, // 因而将其直接添加到ngx_posted_events队列中,否则直接处理该事件 if (flags & NGX_POST_EVENTS) { ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } return NGX_OK; }
这里ngx_epoll_process_events()方法首先就是调用epoll_wait()方法获取所监听的句柄的事件,然后遍历获取的事件,根据事件的类型,如果是accept事件,则添加到ngx_posted_accept_events队列中,如果是读写事件,则添加到ngx_posted_events队列中,而队列中事件的处理,则在上面介绍的ngx_process_events_and_timers()方法中进行。
위 내용은 epoll 모델을 기반으로 한 nginx 이벤트 중심 프로세스에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!