ngx_连接池与事件封装(一)

Catalogue
  1. 1. 连接池概述
    1. 1.1. 长连接连接池
    2. 1.2. 短连接池
    3. 1.3. 连接池注意事项
  2. 2. 连接池初始化
  3. 3. 连接池相关接口
    1. 3.1. @ngx_get_connection 获取空闲连接
    2. 3.2. @ngx_drain_connections 释放空闲连接
    3. 3.3. @ngx_reuseable_connection 添加长连接队列
    4. 3.4. @free_connection 归还到空闲连接池链表上
    5. 3.5. @close_connection 关闭连接并清理回收
  4. 4. 连接池在框架初始化中的体现
    1. 4.1. @ngx_create_listening 保存监听端口
    2. 4.2. @ngx_open_listening_sockets 创建所有socket
    3. 4.3. @ngx_event_process_init epoll监听所有socket

连接池概述

nginx的连接池是属于短链接池的,因为主要业务场景为代理服务器,生命周期是有限的,下面来看看两种类型的连接池的区别

长连接连接池

我们说连接池一般指的是:http连接池,tcp连接池,udp连接池等常用的网络连接复用。例如tcp连接池,当你有两个服务并且存在相互调用进行数据传输,那么必然存在连接建立(三次握手),连接关闭(四次挥手)等交互

问题是如果每次发送数据都要建立连接和关闭,那对于系统消耗还是很大的,每次创建连接不但要进行系统资源的消耗,而且用户层面也要申请内存来存放相关结构体

那么连接池的作用这时候就显得格外重要了,想象一下每次建立连接后不关闭呢,就让两个服务保持长连接通讯可以不呢?答案当然是yes

将所有连接一次性初始化,或者动态添加到连接池中,每次准备向对方发送数据时,直接去连接池里拿存活连接,没有就走新建连接流程

短连接池

nginx的连接池其实是属于短连接池的,连接池的主要作用当然也是为了节省内存,提高tcp交互速度

但是短连接池还是有一些不一样的,因为在短链接的场景下网络连接是无法复用的,唯一能够复用的就是承载该网络连接的那个结构体,对于nginx来说,内存当然是能省就省

当然连接池里的连接也可以被某个持久长连接占用,而且是长期占用,对于代理的场景,那可能是双倍占用

连接池注意事项

nginx的连接池在初始化启动期间就完全根据nginx.conf 中connections数量分配对应的内存,所以内存是一次性占有的,新连接进来也就是有一个fd数据变更

注意:连接池为空则丢弃该请求,不在处理新请求,例如你的worker_connections=5 那么当6条长连接请求过来时,最后一条肯定是不会被处理,一直处于等待期间

连接池初始化

连接池初始化时属于event事件模块的任务,在event_core_module核心模块启动的时候(模块的启动由 cyle_init 中遍历所有的模块统一启动初始化)进行的初始化

static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle)为实际初始化连接池的函数

1
2
3
4
cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
if (cycle->connections == NULL) {
return NGX_ERROR;
}

cycle->connection_n该参数在cycle_init主函数中 解析nginx.conf配置文件中worker_connection参数进行初始化,可以看出,该参数直接导致nginx在启动时会占用申请多少的内存,并且该内存会持续到nginx生命周期结束后释放

1
2
3
4
5
6
7
8
9
10
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
if (cycle->read_events == NULL) {
return NGX_ERROR;
}
rev = cycle->read_events;
for (i = 0; i < cycle->connection_n; i++) {
rev[i].closed = 1;
rev[i].instance = 1;
}

连接池和读写事件是密不可分的,这里先申请对应数量的读事件并初始化为closed,在下文会和连接池进行一一绑定

1
2
3
4
5
6
7
8
9
10
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
if (cycle->write_events == NULL) {
return NGX_ERROR;
}

wev = cycle->write_events;
for (i = 0; i < cycle->connection_n; i++) {
wev[i].closed = 1;
}

申请写事件内存,和上面读事件对应,都是用于向epoll添加事件时的关联参数

1
2
3
4
5
6
7
8
9
10
11
12
13
i = cycle->connection_n;
next = NULL;

do {
i--;

c[i].data = next;
c[i].read = &cycle->read_events[i];
c[i].write = &cycle->write_events[i];
c[i].fd = (ngx_socket_t) -1;

next = &c[i];
} while (i);

将所有的连接 和对应的读写事件一一绑定,在epoll_wait 监听事件的时候就可以通过connection_t *c = ev->data获取对应连接对象

连接池相关接口

  • ngx_get_connection
  • ngx_free_connection
  • ngx_close_connection
  • ngx_reusable_connection
  • ngx_drain_connections

@ngx_get_connection 获取空闲连接

用的最多的接口,当accept被动打开新连接的时候,需要从连接池中获取空闲的connection_t结构体封装tcp连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
c = ngx_cycle->free_connections;

if (c == NULL) {
ngx_drain_connections((ngx_cycle_t *) ngx_cycle);
c = ngx_cycle->free_connections;
}

if (c == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"%ui worker_connections are not enough",
ngx_cycle->connection_n);

return NULL;
}

直接通过cycle->free_connections 全局链表上拿表头的那个连接,如果不为空,说明空闲,直接走面初始化流程即可

如果连接为空,则需要调用ngx_drain_connections去释放空闲连接(去挨个执行所有的连接事件,尽可能的释放出一下空闲连接出来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ngx_cycle->free_connections = c->data;
ngx_cycle->free_connection_n--;

if (ngx_cycle->files && ngx_cycle->files[s] == NULL) {
ngx_cycle->files[s] = c;
}

rev = c->read;
wev = c->write;

ngx_memzero(c, sizeof(ngx_connection_t));

c->read = rev;
c->write = wev;
c->fd = s;
c->log = log;

这里就是获取连接的主要操作了,连接池的所有连接都是在一个链表上通过c->data串起来的,所以,这里只是将空闲指针移动到下一位即可,回收的时候也只需要插入表头指针前面即可,存取的复杂度都是O(1)

@ngx_drain_connections 释放空闲连接

在上面从连接池中获取连接的时候,会发现如果没有空余的连接则会调用如下的方法看看是否能强制空出一些连接来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void
ngx_drain_connections(void)
{
ngx_int_t i;
ngx_queue_t *q;
ngx_connection_t *c;

for (i = 0; i < 32; i++) {
if (ngx_queue_empty(&ngx_cycle->reusable_connections_queue)) {
break;
}

q = ngx_queue_last(&ngx_cycle->reusable_connections_queue);
c = ngx_queue_data(q, ngx_connection_t, queue);

ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0,
"reusing connection");

c->close = 1;
c->read->handler(c->read);
}
}

其实主要就是看看queue长连接链表上选取32个出来,对他们全部执行c->close = 1,read->handler(c->read),由于close属性会导致http_close_connection回收该链接

但是强制回收链接前会对他进行一个读取事件的操作recv(fd),如果返回0 则说明对端已关闭,也需要在handler中释放该链接到free_connections上。

@ngx_reuseable_connection 添加长连接队列

该方法当客户端设置keep-alive长连接属性时,nginx会将它丢到c->queue队列上,遇到上面连接池不够时,会释放掉长连接队列上的 不活跃链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void
ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable)
{
// 一旦一个keepalive的连接正常处理了,就将其从reusable队列中移除
if (c->reusable) {
ngx_queue_remove(&c->queue);
}

// 在ngx_http_set_keepalive中会将reusable置为1,reusable为1的直接效果
// 就是将该连接插到reusable_connections_queue中
c->reusable = reusable;

// 当reusable为0时,意味着该keepalive被正常的处理掉了,不应该被再次添加
// 到reusable队列中了。
if (reusable) {
/* need cast as ngx_cycle is volatile */
// 这里使用头插法,较新的连接靠近头部,时间越久未被处理的连接越靠尾
ngx_queue_insert_head(
(ngx_queue_t *) &ngx_cycle->reusable_connections_queue, &c->queue);
}

@free_connection 归还到空闲连接池链表上

@close_connection 关闭连接并清理回收

会进行一系列清除工作

  • 移除epoll监听的读写事件
  • 删除任务队列post_event队列
  • 标记读写事件关闭
  • 从长连接链表里面移除该链接归还到 空闲链表上
  • 调用上面free_connection归还链接到空闲链表上
  • close(fd) 关闭tcp对端

连接池在框架初始化中的体现

在nginx启动期间,读取nginx.conf中的配置,并调用个个模块的方法来处理它

例如server{ listen 8080; }这种配置,会全部被http核心模块解析

  1. 每发现一个listen配置,就会调用http模块create_conf,init_conf等来保存该配置
  2. 并且将每一个端口 初始化为listening_t结构体保存到全局cycle->listening数组中
  3. cycle_init主函数中,调用每个模块的时候会触发http->init_process初始化http核心模块,然后将所有的端口都进行sock_create(),bind(),listen等操作创建socket
  4. 最后从连接池中为每个socket 分配一个connection_t连接包装其他
  5. 服务端socket 和 普通tcp的连接有什么不同呢?其实主要是在conneciton->read读事件为accept,非常灵活的将各种类型的连接都统一为相同的接口

@ngx_create_listening 保存监听端口

每从nginx.conf中读取一个listen配置,都需要创建一个listenint_t结构体保存到全局链表上

1
2
3
4
5
6
7
8
9
10
11
12
//core/ngx_connection.c:20
ls = ngx_array_push(&cf->cycle->listening);
if (ls == NULL) {
return NULL;
}

ngx_memzero(ls, sizeof(ngx_listening_t));

sa = ngx_palloc(cf->pool, socklen);
if (sa == NULL) {
return NULL;
}

@ngx_open_listening_sockets 创建所有socket

1
2
3
4
5
6
7
8
9
10
//core/ngx_connection.c:269
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0)

if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int))
if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1)
if (listen(s, ls[i].backlog) == -1) {
}

总之就是初始化所有配置文件中定义的socket

@ngx_event_process_init epoll监听所有socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
event/event.c
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {

c = ngx_get_connection(ls[i].fd, cycle->log);

if (c == NULL) {
return NGX_ERROR;
}

c->log = &ls[i].log;

c->listening = &ls[i];
ls[i].connection = c;

rev = c->read;
rev->handler = ngx_event_accept;
if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
if (ngx_add_conn(c) == NGX_ERROR) {
return NGX_ERROR;
}

} else {
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
}
}
  1. 从连接池中获取空闲连接包装socket
  2. 将event事件设置为ngx_event_accept 当事件触发了,那一定可能是新是连接的到来需要accept
  3. 将socket读写事件加入epoll监听