mini_ngx_实现四-event模块

Catalogue
  1. 1. 事件模型概述
    1. 1.1. 事件收集器
    2. 1.2. 事件分发器
  2. 2. ngx 事件初始化
    1. 2.1. 全局epoll接口
  3. 3. 事件相关api
    1. 3.1. @add_event 注册事件
    2. 3.2. @del_event 删除事件
    3. 3.3. @add_conn 注册连接事件
    4. 3.4. @del_conn 删除事件
  4. 4. @process_events 事件分发

事件模型概述

主要分为两个方面事件收集器、事件分发器

事件收集器

也就是向epoll添加、更新、删除等事件,让epoll事件去管理

  • EPOLL_CTL_MOD
  • EPOLL_CTL_ADD
  • EPOLL_CTL_DEL

    事件分发器

    实际是调用epoll_wait 收集内核通知的就绪事件,然后调用ev->handler执行用户自定义该事件的处理方法

ngx 事件初始化

事件的初始化实际就是对epoll进行初始化,如epoll_create()调用创建epollfd event_list事件列表内存申请

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
.....
for (m = 0; cycle->modules[m]; m++) {
if (cycle->modules[m]->type != EVENT_MODULE) {
continue;
}

if (cycle->modules[m]->ctx_index != ecf->use) {
continue;
}

module = cycle->modules[m]->ctx;

if (module->actions.init(cycle, timer_resolution) != OK) {
/* fatal */
exit(2);
}

break;
}
.....

这里是event核心模块被cycle_init()主函数进行init_process()初始化时调用的函数,在上一节有说明,该函数是对连接池 ,读事件 ,写事件等进行内存申请初始化,串联成链表

这里也是其中的工作之一,因为nginx高度可扩展,所以event实际的系统实现有很多种epoll,kqueue,pool,select..等,在./configure时会进行环境检查,将兼容平台的实现,如epoll_module.c加入到cycle->modules[m]中,所以上面的循环就是找出如epoll的实现并去调用该初始化方法,准备好接受事件

全局epoll接口

加入平台的event实现为epoll则会调用epoll_moudle.c 的初始化方法,将相关的接口添加到 event_actions全局变量上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//src/event/module/epoll_module.c:369
static event_module_t epoll_module_ctx = {
&epoll_name,
epoll_create_conf, /* create configuration */
epoll_init_conf, /* init configuration */

{
epoll_add_event, /* add an event */
epoll_del_event, /* delete an event */
epoll_add_event, /* enable an event */
epoll_del_event, /* disable an event */
epoll_add_connection, /* add an connection */
epoll_del_connection, /* delete an connection */

epoll_process_events, /* process the events */
epoll_init, /* init the events */
epoll_done, /* done the events */
}
};

event_actions = epoll_module_ctx.actions;

也就是将epoll的add,del,add_con,del_con,epoll_process_events等接口条件到全局actions上,提供外部访问,收集外部事件,并分发就绪事件

事件相关api

下面统一接口的实现假定为epoll实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef struct {
int_t (*add)(event_t *ev, int_t event, uint_t flags);
int_t (*del)(event_t *ev, int_t event, uint_t flags);

int_t (*enable)(event_t *ev, int_t event, uint_t flags);
int_t (*disable)(event_t *ev, int_t event, uint_t flags);

int_t (*add_conn)(connection_t *c);
int_t (*del_conn)(connection_t *c, uint_t flags);

int_t (*notify)(event_handler_pt handler);

int_t (*process_events)(cycle_t *cycle, msec_t timer,
uint_t flags);

int_t (*init)(cycle_t *cycle, msec_t timer);
void (*done)(cycle_t *cycle);
} event_actions_t;

event_actions_t event_actions;

在event.c 中会定义全局变量event_actions,改变量对应的平台实现的相关事件接口,如上文初始化时,如果平台支持epoll则将epoll事件的相关api添加到全局变量event_actions中,提供外部调用注册事件

1
2
3
4
5
6
7
8

#define process_events event_actions.process_events
#define done_events event_actions.done

#define add_event event_actions.add
#define del_event event_actions.del
#define add_conn event_actions.add_conn
#define del_conn event_actions.del_conn

并且默认提供了相关宏定义,直接通过宏定义更加方便些

@add_event 注册事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int_t
epoll_add_event(event_t *ev, int_t event, uint_t flags)
{
int op;
uint32_t events, prev;
event_t *e;
connection_t *c;
struct epoll_event ee;

c = ev->data;

events = (uint32_t) event;

if (event == READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
events = EPOLLIN;
} else {
e = c->read;
prev = EPOLLIN;
events = EPOLLOUT;
}

if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;

} else {
op = EPOLL_CTL_ADD;
}

ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
log_info(c->log,"epoll add event %d ",c->fd);

if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
log_error(c->log,"epoll_ctl %d failed\n",c->fd);
return ERROR;
}

ev->active = 1;
return OK;
}

判断是新增还是修改,e->active如果为1,说明之前注册过该事件,需要走更改事件流程epoll_ctl_mod

例如http流程中假如需要等待客户端发送body才能进行下面的操作,那么就可以将该http的读事件通过这个接口注册到epoll

当客户端发送了数据,内核收到的数据后分发该就绪事件,将内核数据拷贝到用户态空间调用ev->handler()回调函数继续执行上一次中断的函数

@del_event 删除事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int_t
epoll_del_event(event_t *ev, int_t event, uint_t flags)
{
.......
if (e->active) {
op = EPOLL_CTL_MOD;
ee.events = prev | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

} else {
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;
}

log_info(c->log,"epoll_ctl %d",c->fd);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
log_error(c->log,"epoll_ctl :%d failed",c->fd);
return ERROR;
}

ev->active = 0;

return OK;
}

同样当http请求生命周期结束,也就是http 引用计数count真正为0的时候,会触发event_del,pool_destory,socket_close..等进行事件删除,内存池回收,tcp关闭等一系列回收机制

event事件删除后,epoll不在负责相关事件监控

@add_conn 注册连接事件

该方法更加方便,直接将connection_t连接注册到epoll中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int_t
epoll_add_connection(connection_t *c)
{
struct epoll_event ee;

ee.events = EPOLLIN|EPOLLOUT|EPOLLET;
ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);

log_info(c->log,"epoll add connection fd:%d",c->fd);
if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
log_error(c->log,"epoll add connection fd:%d failed",c->fd);
return ERROR;
}

c->read->active = 1;
c->write->active = 1;

return OK;
}

直接调用epoll_ctl()将该事件添加到epoll_ctl中

@del_conn 删除事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int_t
epoll_del_connection(connection_t *c, uint_t flags)
{
int op;
struct epoll_event ee;

if (flags & CLOSE_EVENT) {
c->read->active = 0;
c->write->active = 0;
return OK;
}
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;

if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
printf("del connection failed");
return ERROR;
}

c->read->active = 0;
c->write->active = 0;

return OK;
}

直接del移除该事件即可

@process_events 事件分发

nginx是所有的事件执行都来自事件循环监测事件并发事件执行,该函数在nginxwoker进程启动后作为while(1){}循环事件调用,

1
2
3
4
5
static int_t
epoll_process_events(cycle_t *cycle, msec_t timer, uint_t flags)
{
.......
}

事件分发主要分为如下重要部分

  1. 调用epoll_wait监测就绪事件,如tcp连接,数据读写,tcp关闭。。。等等就绪事件,events为就绪事件的总数

    1
    events = epoll_wait(ep, event_list, (int) nevents, timer);
  2. 分发所有就绪事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
        for (i = 0; i < events; i++) {
    c = event_list[i].data.ptr;

    instance = (uintptr_t) c & 1;
    c = (connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

    rev = c->read;
    //判断该连接是否已经失效,因为如果在执行之前的连接事件的时候将当前连接关闭了,单该连接又被新连接给复用了,这就需要instance来解决了,closed无法解决新连接将之前连接复用的例外
    if (c->fd == -1 || rev->instance != instance) {

    /*
    * the stale event from a file descriptor
    * that was just closed in this iteration
    */

    log_debug1(LOG_DEBUG_EVENT, cycle->log, 0,
    "epoll: stale event %p", c);
    continue;
    }
    取出事件
    revents = event_list[i].events;
    }
  3. 这里比较重要,nginx是基于事件来执行的,如果其中任何一个事件阻塞了,将会导致整个进程得不到处理任何任务,例如新连接accept可能需要优先执行,而普通收发数据可能需要放到延迟队列去执行

    1
    2
    3
    4
    5
    if (flags & POST_EVENTS) {
    post_event(wev, &posted_events);
    } else {
    wev->handler(wev);
    }