epoll 모델을 기반으로 한 nginx 이벤트 중심 프로세스에 대한 자세한 설명

藏色散人
풀어 주다: 2020-02-01 15:06:00
앞으로
5013명이 탐색했습니다.

이 글에서는 먼저 epoll 모델의 구현 원리를 설명한 다음 nginx가 epoll 모델을 기반으로 이벤트 중심 모델을 구현하는 방법을 소스 코드 수준에서 설명합니다.

epoll 모델을 기반으로 한 nginx 이벤트 중심 프로세스에 대한 자세한 설명

epoll은 이벤트 중심 모델이며, 이는 nginx가 클라이언트 요청을 효율적으로 처리할 수 있는 중요한 이유 중 하나입니다. 프로세스 관점에서 epoll 모델의 사용은 주로 epoll 핸들 생성, 수신 파일 설명자 추가 및 대기 이벤트 트리거의 세 단계로 나뉩니다. 이 기사에서는 nginx가 클라이언트 요청의 효율적인 처리를 구현하는 방법을 소개합니다. 이 세 단계를 기반으로 합니다.

관련 추천: "Nginx Tutorial"

1. epoll 모델 소개

nginx의 구현 원리를 소개하기 전에 먼저 epoll 모델의 기본 사용법을 소개해야 합니다. epoll을 사용하는 세 가지 주요 방법이 있습니다:

// 创建epoll句柄 int epoll_create(int size); // 往epoll句柄中添加需要进行监听的文件描述符 int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event); // 等待需要监听的文件描述符上对应的事件的发生 int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);
로그인 후 복사

먼저 epoll_create() 메서드를 호출하여 epoll 인스턴스에 대한 핸들을 생성합니다. 여기서 핸들은 eventpoll 구조 인스턴스로 이해될 수 있으며 여기에 빨간색이 있습니다. 블랙 트리와 큐 구조 레드-블랙 트리는 주로 모니터링해야 할 파일 디스크립터를 저장하며, 아래 그림과 같이 큐는 모니터링되는 파일 디스크립터에서 지정된 이벤트가 발생할 때 이러한 이벤트를 큐에 추가합니다. eventpoll의 도식 다이어그램:

epoll 모델을 기반으로 한 nginx 이벤트 중심 프로세스에 대한 자세한 설명

일반적으로 프로그램의 전체 실행 주기 동안 하나의 epoll 핸들만 있습니다. 예를 들어 nginx의 각 작업자 프로세스는 하나의 epoll 핸들만 유지합니다. 핸들을 생성한 후 프로그램이 수신하는 각 포트는 본질적으로 파일 설명자입니다. 이 파일 설명자에서 Accept 이벤트가 발생할 수 있습니다. 즉, 클라이언트 요청이 수신됩니다.

따라서 처음에는 epoll_ctl() 메소드를 통해 모니터링해야 하는 포트에 해당하는 파일 디스크립터를 epoll 핸들에 추가하겠습니다. 추가가 성공한 후 각 수신 파일 설명자는 eventpoll의 레드-블랙 트리에 있는 노드에 해당합니다.

또한 epoll_ctl() 메서드를 호출하여 파일 설명자를 추가한 후 해당 장치(네트워크 카드)와 연결됩니다. 장치 드라이버에서 이벤트가 발생하면 현재 파일 설명자의 콜백 메서드 ep_poll_callback이 ()를 호출하여 이벤트를 생성하고 해당 이벤트를 eventpoll의 이벤트 큐에 추가합니다.

마지막으로 epoll_wait() 메서드를 호출하면 epoll 핸들에서 해당 이벤트를 가져옵니다. 기본적으로 eventpoll의 이벤트 큐가 비어 있는지 확인하고, 그렇지 않으면 기다립니다. 이벤트 발생을 위해.

또한 epoll을 사용하기 위해 여기서 얻은 이벤트는 일반적으로 Accept 이벤트입니다. 이 이벤트를 처리할 때 이 핸들은 본질적으로 파일 설명자입니다. epoll_wait() 메서드를 통해 해당 데이터의 읽기 및 쓰기 이벤트를 계속 대기하기 위해 epoll_ctl() 메서드를 통해 현재 epoll 핸들에 계속 추가됩니다.

여기에서 epoll을 사용하는 동안 두 가지 유형의 파일 설명자가 있음을 알 수 있습니다. 한 가지 유형은 우리가 모니터링하는 포트에 해당하는 파일 설명자입니다. 일반적으로 이러한 유형의 설명자에 대한 Accept 이벤트를 모니터링합니다. 클라이언트 연결을 기다립니다. 다른 유형은 각 클라이언트 연결에 해당하는 파일 설명자입니다. 여기서는 일반적으로 읽기 및 쓰기 이벤트를 수신하여 클라이언트에 데이터를 보내고 보냅니다.

2. nginx에서 epoll을 구현하는 방법

이전 기사에서 nginx가 이벤트 기반 프레임워크를 초기화하는 방법을 설명했습니다. 이벤트 프레임워크의 핵심 모듈 중 하나는 다음과 같습니다.

ngx_module_t ngx_event_core_module = { NGX_MODULE_V1, &ngx_event_core_module_ctx, /* module context */ ngx_event_core_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init master */ // 该方法主要是在master进程启动的过程中调用的,用于初始化时间模块 ngx_event_module_init, /* init module */ // 该方法是在各个worker进程启动之后调用的 ngx_event_process_init, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };
로그인 후 복사

여기에 필요합니다. 앞서 언급한 대로 ngx_event_process_init() 메서드는 각 작업자가 생성될 때 초기화되고 호출됩니다. 여기에는 두 가지 매우 중요한 호출이 포함됩니다. a. 지정된 각 포트에서 수신합니다. 구성 파일에서. 다음은 이 두 단계의 주요 코드입니다.

static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { // 省略部分代码.... // 在nginx.conf配置文件的events{}配置块中需要使用use指令指定当前使用的事件模型, // 此时就会将所使用的事件模型的索引号存储在ecf->use中,下面的代码就是通过这种方式获取当前 // 所指定的事件模型所对应的模块的,然后调用该模块的actions.init()方法初始化该事件模型 for (m = 0; cycle->modules[m]; m++) { if (cycle->modules[m]->type != NGX_EVENT_MODULE) { continue; } // ecf->use存储了所选用的事件模型的模块序号,这里是找到该模块 if (cycle->modules[m]->ctx_index != ecf->use) { continue; } // module即为所选用的事件模型对应的模块 module = cycle->modules[m]->ctx; // 调用指定事件模型的初始化方法 if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { exit(2); } break; } // 省略部分代码... ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { #if (NGX_HAVE_REUSEPORT) if (ls[i].reuseport && ls[i].worker != ngx_worker) { continue; } #endif // 这里是为当前所监听的每一个端口都绑定一个ngx_connection_t结构体 c = ngx_get_connection(ls[i].fd, cycle->log); if (c == NULL) { return NGX_ERROR; } rev = c->read; // SOCK_STREAM表示TCP,一般都是TCP,也就是说在接收到客户端的accept事件之后, // 就会调用ngx_event_accept()方法处理该事件 rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg; if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ccf->worker_processes > 1) { if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT) == NGX_ERROR) { return NGX_ERROR; } continue; } } return NGX_OK; }
로그인 후 복사

여기 코드는 주로 다음 부분을 완성합니다.

먼저 사용된 이벤트 모델 모듈을 찾은 다음 해당 init() 메서드를 호출하여 모델을 초기화합니다. 여기서 수행되는 두 가지 주요 작업은 epoll_create() 메서드를 통해 epoll 핸들을 생성하는 것입니다. 이는 현재 작업자 프로세스를 실행하기 위한 기초입니다. 다른 하나는 전역 변수 ngx_event_actions에 값을 할당하는 것입니다. 이 할당 호출은 매우 중요합니다. 할당 후 nginx에서 정의한 여러 메소드 매크로는 모두 사용된 epoll 모듈에 지정된 메소드입니다.

// 这里将epoll相关的事件操作方法赋值给ngx_event_actions, // 也就是说后续有相关的事件发生则都会使用epoll相关的方法 ngx_event_actions = ngx_epoll_module_ctx.actions;
로그인 후 복사

그리고 여기에 ngx_epoll_module_ctx.actions 구조가 있습니다. :

#define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn
로그인 후 복사

이에서 우리가 선택한 이벤트 모델을 통해 ngx_add_event()와 같은 매크로에 대해 구현된 하위 모듈을 동적으로 지정할 수 있다는 것을 알 수 있습니다.

上面的方法完成的第二个主要的工作就是遍历所有监听的端口,获取其描述符,然后通过ngx_add_event()方法将其添加到epoll句柄中以监听其客户端连接事件。从这里就可以感觉到比较巧妙了,因为上面一步中正好对epoll模块进行了初始化,并且设置了ngx_add_event()宏的实现方法,而这里就使用到了这里设置的方法,该方法本质上就是通过epoll_ctl()方法将当前监听的socket描述符添加到epoll句柄中;

最后就是上面的方法在遍历所有监听的端口的时候,为每个连接的accept事件添加的回调方法是ngx_event_accept(),通过前面我们对epoll模型的使用方式的介绍,我们大概可以理解,这里的ngx_event_accept()方法的主要作用是将当前accept到的客户端连接的句柄通过epoll_ctl()方法添加到当前epoll句柄中,以继续监听其读写事件;

这里我们首先看一下上面第一点中介绍的module->actions.init(cycle, ngx_timer_resolution)方法调用时是如何初始化epoll模块的。由于是epoll模块,这里的init()方法指向的就是ngx_epoll_init()方法,如下是该方法的源码:

static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; // 获取解析得到的ngx_epoll_conf_t结构体 epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (ep == -1) { // 创建eventpoll结构体,将创建得到的文件描述符返回 ep = epoll_create(cycle->connection_n / 2); // ep==-1表示创建失败 if (ep == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } } // 如果nevents小于epcf->events,说明event_list数组的长度不够,因而需要重新申请内存空间 if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } // 为event_list重新申请内存空间 event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list == NULL) { return NGX_ERROR; } } // 将nevents更新为配置文件中指定的大小 nevents = epcf->events; ngx_io = ngx_os_io; // 这里将epoll相关的事件操作方法赋值给ngx_event_actions,也就是说后续有相关的事件发生则 // 都会使用epoll相关的方法 ngx_event_actions = ngx_epoll_module_ctx.actions; // 这里NGX_USE_CLEAR_EVENT指的是使用ET模式来使用epoll,默认使用ET模式, // 而NGX_USE_LEVEL_EVENT表示使用LE模式来使用epoll #if (NGX_HAVE_CLEAR_EVENT) ngx_event_flags = NGX_USE_CLEAR_EVENT #else ngx_event_flags = NGX_USE_LEVEL_EVENT #endif // NGX_USE_GREEDY_EVENT表示每次拉取事件是都尝试拉取最多的事件 | NGX_USE_GREEDY_EVENT | NGX_USE_EPOLL_EVENT; return NGX_OK; }
로그인 후 복사

可以看到,这里的ngx_epoll_init()方法主要的作用有两个:a. 通过epoll_create()方法创建一个epoll句柄;b. 设置ngx_event_actions属性所指向的方法的实现,从而确定ngx_add_event()等宏的实现方法。下面我们来看一下ngx_add_event()是如何将需要监听的文件描述符添加到epoll句柄中的:

static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; // ev->data在使用的过程中存储的是当前对应的ngx_connection_t,如果是free_connection, // 则存储的是下一个节点的指针 c = ev->data; // 事件类型 events = (uint32_t) event; // 如果是读事件 if (event == NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; #if (NGX_READ_EVENT != EPOLLIN | EPOLLRDHUP) events = EPOLLIN | EPOLLRDHUP; // 设置读事件类型 #endif } else { e = c->read; prev = EPOLLIN | EPOLLRDHUP; #if (NGX_WRITE_EVENT != EPOLLOUT) events = EPOLLOUT; // 设置写事件类型 #endif } // 根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件 if (e->active) { op = EPOLL_CTL_MOD; // 类型为修改事件 events |= prev; } else { op = EPOLL_CTL_ADD; // 类型为添加事件 } #if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP) if (flags & NGX_EXCLUSIVE_EVENT) { events &= ~EPOLLRDHUP; } #endif // 将flags参数指定的事件添加到监听列表中 ee.events = events | (uint32_t) flags; // 这里是将connection指针的最后一位赋值为ev->instance,然后将其赋值给事件的ptr属性,通过这种方式检测事件是否过期 ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); // 将事件添加到epoll句柄中 if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } // 将事件标记为活跃状态 ev->active = 1; #if 0 ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; #endif return NGX_OK; }
로그인 후 복사

这里的ngx_add_event()方法本质上是比较简单的,就是将当前的ngx_event_t转换为一个epoll_event结构体,并且会设置该结构体中需要监听的事件类型,然后通过epoll_ctl()方法将当前epoll_event添加到epoll句柄中。

在前面的ngx_event_process_init()方法中,nginx通过ngx_add_event()方法将各个监听的端口的描述符添加到epoll句柄中之后,就会开始监听这些描述符上的accept连接事件,如果有客户端连接请求,此时就会回调ngx_event_accept()方法处理该请求,我们来看一下该方法是如何处理客户端建立连接的请求的:

/** * 当客户端有accept事件到达时,将调用此方法处理该事件 */ void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; #if (NGX_HAVE_ACCEPT4) static ngx_uint_t use_accept4 = 1; #endif if (ev->timedout) { // 如果当前事件超时了,则继续将其添加到epoll句柄中以监听accept事件 if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } // 获取解析event核心配置结构体 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; do { socklen = sizeof(ngx_sockaddr_t); #if (NGX_HAVE_ACCEPT4) if (use_accept4) { s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK); } else { s = accept(lc->fd, &sa.sockaddr, &socklen); } #else // 这里lc->fd指向的是监听的文件句柄,调用accept()获取客户端的连接,并且将其存储到sa.sockaddr中 s = accept(lc->fd, &sa.sockaddr, &socklen); #endif // 检查当前进程获取的连接个数是否超过了最大可用连接数的7/8,是则不再继续接收连接 ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; // 获取新的连接 c = ngx_get_connection(s, ev->log); // 获取连接失败则直接返回 if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } // 标记当前为TCP连接 c->type = SOCK_STREAM; // 为当前连接创建连接池 c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } // 更新socklen的长度 if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { socklen = sizeof(ngx_sockaddr_t); } // 为sockaddr申请内存空间,并且将客户端连接地址复制到c->sockaddr中 c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, &sa, socklen); // 申请ngx_log_t结构体的内存空间 log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { // 将连接设置为阻塞模式 if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { // 将连接设置为非阻塞模式 if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; // 设置连接的基本属性 c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } #endif rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; #if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP) rev->available = 1; #endif } rev->log = log; wev->log = log; // 更新连接使用次数 c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); // 将网络地址更新为字符串形式的地址 if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } #if (NGX_DEBUG) { ngx_str_t addr; u_char text[NGX_SOCKADDR_STRLEN]; ngx_debug_accepted_connection(ecf, c); if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, "*%uA accept: %V fd:%d", c->number, &addr, s); } } #endif // 将当前连接添加到epoll句柄中进行监控 if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; // 建立新连接之后的回调方法 ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); }
로그인 후 복사

这里客户端连接的建立过程主要可以分为如下几个步骤:

首先调用accept()方法获取当前客户端建立的连接,并且将其地址信息保存到结构体sa中;

接着通过调用ngx_get_connection()方法获取一个ngx_connection_t结构体以对应当前获取到的客户端连接,并且会初始化该结构体的各个属性;

调用ngx_add_conn()方法将当前方法添加到epoll句柄中,这里的添加过程本质上就是通过epoll_ctl()方法将当前客户端的连接的文件描述符添加到epoll句柄中,以监听其读写事件;

如此我们就讲解了从epoll句柄的创建,到指定的端口的监听,接着处理客户端连接,并且将客户端连接对应的文件描述符继续添加到epoll句柄中以监听读写事件的流程。下面我们继续来看一下nginx是如何等待所监听的这些句柄上的事件的发生的,也即整个事件框架的驱动程序。worker进程对于事件的处理,主要在ngx_process_events_and_timers()方法中,如下是该方法的源码:

void ngx_process_events_and_timers(ngx_cycle_t *cycle) { // 尝试获取共享锁 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } // 这里开始处理事件,对于kqueue模型,其指向的是ngx_kqueue_process_events()方法, // 而对于epoll模型,其指向的是ngx_epoll_process_events()方法 // 这个方法的主要作用是,在对应的事件模型中获取事件列表,然后将事件添加到ngx_posted_accept_events // 队列或者ngx_posted_events队列中 (void) ngx_process_events(cycle, timer, flags); // 这里开始处理accept事件,将其交由ngx_event_accept.c的ngx_event_accept()方法处理; ngx_event_process_posted(cycle, &ngx_posted_accept_events); // 开始释放锁 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } // 如果不需要在事件队列中进行处理,则直接处理该事件 // 对于事件的处理,如果是accept事件,则将其交由ngx_event_accept.c的ngx_event_accept()方法处理; // 如果是读事件,则将其交由ngx_http_request.c的ngx_http_wait_request_handler()方法处理; // 对于处理完成的事件,最后会交由ngx_http_request.c的ngx_http_keepalive_handler()方法处理。 // 这里开始处理除accept事件外的其他事件 ngx_event_process_posted(cycle, &ngx_posted_events); }
로그인 후 복사

这里的ngx_process_events_and_timers()方法我们省略了大部分代码,只留下了主要的流程。简而言之,其主要实现了如下几个步骤的工作:

获取共享锁,以得到获取客户端连接的权限;

调用ngx_process_events()方法监听epoll句柄中各个文件描述符的事件,并且处理这些事件。在前面我们讲到,nginx在调用epoll模块的init()方法时,初始化了ngx_event_actions属性的值,将其指向了epoll模块所实现的方法,这里就包括ngx_process_events方法宏所对应的方法,也即ngx_epoll_process_events()方法,因而这里其实就可以理解,ngx_epoll_process_events()方法本质上就是调用epoll_wait()方法等待epoll句柄上所监听的事件的发生;

处理ngx_posted_accept_events队列中的事件,这些事件其实就是前面讲到的客户端建立连接的事件,在ngx_epoll_process_events()方法中获取到事件之后,会判断其是accept事件还是读写事件,如果是accept事件,就会将其添加到ngx_posted_accept_events队列中,如果是读写事件,就会将其添加到ngx_posted_events队列中;

释放共享锁,以让其他的worker进程可以获取锁,从而接收客户端连接;

处理ngx_posted_events队列中的事件,也即客户端连接的读写事件。从这里就可以看出nginx高性能的一个原因,其将accept事件和读写事件放到了两个不同的队列中,accept事件是必须在锁内部处理的,而读写事件则可以异步于accept事件,这提高了nginx处理客户端请求的能力。

下面我们来看一下ngx_epoll_process_events()方法是如何处理epoll句柄中的事件的:

static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); // 通过epoll_wait()方法进行事件的获取,获取到的事件将存放在event_list中,并且会将获取的事件个数返回 events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; // 这里的ngx_event_timer_alarm是通过一个定时器任务来触发的,在定时器中会将其置为1, // 从而实现定期更新nginx缓存的时间的目的 if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } // 获取的事件个数为0 if (events == 0) { // 如果当前时间类型不为NGX_TIMER_INFINITE,说明获取事件超时了,则直接返回 if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } // 这里说明时间类型为NGX_TIMER_INFINITE,但是却返回了0个事件,说明epoll_wait()调用出现了问题 ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } // 遍历各个事件 for (i = 0; i < events; i++) { // 每个事件的data.ptr中存储了当前事件对应的connection对象 c = event_list[i].data.ptr; // 获取事件中存储的instance的值 instance = (uintptr_t) c & 1; // 获取connection指针地址值 c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); // 获取读事件结构体 rev = c->read; // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance, // 说明该连接已经过期了,则不对该事件进行处理 if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } // 获取当前事件监听的类型 revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); // 如果事件发生错误,则打印相应的日志 if (revents & (EPOLLERR | EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); /* * if the error events were returned, add EPOLLIN and EPOLLOUT * to handle the events at least in one active handler */ revents |= EPOLLIN | EPOLLOUT; } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); } #endif // 如果当前是读事件,并且事件是活跃的 if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } rev->available = 1; #endif // 将事件标记为就绪状态 rev->ready = 1; // 默认是开启了NGX_POST_EVENTS开关的 if (flags & NGX_POST_EVENTS) { // 如果当前是accept事件,则将其添加到ngx_posted_accept_events队列中, // 如果是读写事件,则将其添加到ngx_posted_events队列中 queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { // 如果不需要分离accept和读写事件,则直接处理该事件 rev->handler(rev); } } // 获取写事件结构体 wev = c->write; if ((revents & EPOLLOUT) && wev->active) { // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance, // 说明该连接已经过期了,则不对该事件进行处理 if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } // 将当前事件标记为就绪状态 wev->ready = 1; #if (NGX_THREADS) wev->complete = 1; #endif // 由于是写事件,并且需要标记为了NGX_POST_EVENTS状态, // 因而将其直接添加到ngx_posted_events队列中,否则直接处理该事件 if (flags & NGX_POST_EVENTS) { ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } return NGX_OK; }
로그인 후 복사

这里ngx_epoll_process_events()方法首先就是调用epoll_wait()方法获取所监听的句柄的事件,然后遍历获取的事件,根据事件的类型,如果是accept事件,则添加到ngx_posted_accept_events队列中,如果是读写事件,则添加到ngx_posted_events队列中,而队列中事件的处理,则在上面介绍的ngx_process_events_and_timers()方法中进行。

위 내용은 epoll 모델을 기반으로 한 nginx 이벤트 중심 프로세스에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:oschina.net
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!