glusterfs-客户端源码分析

Catalogue
  1. 1. 一、client端架构流程图
    1. 1.1. 1.1 volume 配置语言
      1. 1.1.1. 1.1.2 关键字: volume&end-volume&subvolumes
      2. 1.1.2. 1.1.3 关键字: type&option
    2. 1.2. 1.2 client端功能设计
      1. 1.2.1. 1.2.1 volume配置获取
      2. 1.2.2. 1.2.2 volume配置解析
  2. 2. 二、xlator
    1. 2.1. 2.1 xlator实现
      1. 2.1.1. 2.1.1 xlator兼容性
    2. 2.2. 2.2 核心xlator
      1. 2.2.1. 2.2.1 io-threads: 线程池
      2. 2.2.2. 2.2.2 read-ahead: 预读
      3. 2.2.3. 2.2.3 write-behind: 合并
      4. 2.2.4. 2.2.4 distribute: 分布式hash
      5. 2.2.5. 2.2.5 replicate: 副本
      6. 2.2.6. 2.2.6 client: 最后一个模块
  3. 3. 三、一次write的生命周期
    1. 3.1. 3.1 gfapi.so: sdk入口
      1. 3.1.1. 3.1.1 将write操作通过xlator树传递下去
      2. 3.1.2. 3.1.2 阻塞等待整颗树处理完
    2. 3.2. 3.2 meta_autoload.so
    3. 3.3. 3.3 io-stats.so
      1. 3.3.1. 3.4 io-threads.so 线程池
    4. 3.4. 3.5 open-behind.so
    5. 3.5. 3.6 quick-read.so 快速读
    6. 3.6. 3.7 dht_writev: 核心-分布式hash
    7. 3.7. 3.8 replicate.so: 副本机制
      1. 3.7.1. 3.8.1 开启事务,提交rpc数据到client.so
    8. 3.8. 3.9 client.so: rpc client
      1. 3.8.1. 3.9.1 提交rpc数据到glusterfsd
  4. 4. 四、概述

glusterfs也属于典型的cs结构,有client端,server端,通过rpc调用通信

主要的进程有:

  • glusterd: 管理进程,管理所有的配置、版本信息
  • glusterfsd: server进程,处理所有的文件处理请求并进行落盘
  • glusterfs|api: client端,将所有文件的相关操作代理到server端去处理,核心是根据hash算法计算出需要和哪个server端进行通信

一、client端架构流程图

image

sdk(client)层就可以看出,所有的功能都进行了分类,并拆分为大量的so动态库实现,最后通过配置将so动态库串联起来组成了sdk的逻辑

上面所述的配置:可以用通过json、xml、yaml等等来实现,只要能表达关系即可,但在glsuter中通过yacc,lex(./libglusterfs/src/graph.l |graph.y)等词法工具自己实现了一个类似xml的配置语法:volume

来看看上图中sdk对应的volume配置吧,这个配置是通过glusterd获取的,一般glusterd也是保存在本地配置中的(/var/lib/glusterd/vols//trusted*.vol)

1.1 volume 配置语言

下面就是一个完整的client端volume配置文件,该文件对应的是一个3副本的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
volume vol1-dr-client-0
type protocol/client
option remote-subvolume /replica/brick0
option remote-host 192.168.1.34
//省略了很多 option参数
end-volume
volume vol1-dr-client-1
type protocol/client
option remote-subvolume /replica/brick1
option remote-host 192.168.1.34
//省略很多option参数
end-volume
volume vol1-dr-client-2
type protocol/client
option remote-subvolume /replica/brick2
option remote-host 192.168.1.34
//省略很多option参数
end-volume
volume vol1-dr-replicate-0
type cluster/replicate
//省略很多option参数
subvolumes vol1-dr-client-0 vol1-dr-client-1 vol1-dr-client-2
end-volume
volume vol1-dr-dht
type cluster/distribute
subvolumes vol1-dr-replicate-0
end-volume
volume vol1-dr-write-behind
type performance/write-behind
subvolumes vol1-dr-dht
end-volume
volume vol1-dr-read-ahead
type performance/read-ahead
subvolumes vol1-dr-write-behind
end-volume
volume vol1-dr-quick-read
type performance/quick-read
subvolumes vol1-dr-io-cache
end-volume
volume vol1-dr-open-behind
type performance/open-behind
subvolumes vol1-dr-quick-read
end-volume
volume vol1-io-threads
type performance/io-threads
subvolumes vol0-open-behind
end-volume
volume vol1-dr
type debug/io-stats
subvolumes vol1-io-threads
end-volume

1.1.2 关键字: volume&end-volume&subvolumes

整个配置语法非常简单,就是以volume开头以end-volume结尾的一段被作为一个组件,代码里对应每一个xlator

多个组件通过subvolumes连接起来

比如这段

1
2
3
4
5
6
7
8
volume vol1-io-threads
type performance/io-threads
subvolumes vol0-open-behind
end-volume
volume vol1-dr
type debug/io-stats
subvolumes vol1-io-threads
end-volume

说明 io-threads是io-stats的子节点,整个配置内容会自动被词法解析器(yacc)自动串联为一颗树: 因为subvolumes 可以对应多个子节点

但实际情况是串联起来后前面是一个链表,只有最后的叶子结点(protocol/client,cluster/replicate)会存在多个
image

1.1.3 关键字: type&option

type关键字表明当前是一个什么组件(动态库),比如

1
type performance/io-threads

代表这个组件是一个 performances/io-threads.so动态库,源代码是在./xlators/performance/io-threads/src

不一定这个路径一定是按照源代码目录拆分的,但总体相差不大

option关键字代表的是参数,因为每个volume对应的是每个单独的动态库组件,他们全部都有统一的init函数,都需要初始化,所以option用于定义默认的初始化参数

例如:

1
2
3
4
5
6
volume vol1-dr-client-0
type protocol/client
option remote-subvolume /replica/brick0
option remote-host 192.168.1.34
//省略了很多 option参数
end-volume

代表protocol/client.so动态库需要和server端连接的host为192.168.1.34,需要连接的brick是/replica/brick0

当client.so初始化的时候会经过如下流程

  1. 向glusterd(默认24007端口)查询(/replica/brick0)的配置(服务端口信息)
  2. 拿到brick的端口后,开始建立tcp连接
  3. 后续的读写都通过brick(glusterfsd)进行rpc通信

1.2 client端功能设计

整个client的功能完全依赖上面的volume配置,如上图的流程图可得客户端的io流程如下

1
2
3
4
io-stats.so -> io-thread.so -> open-behind.so -> quick-read.so -> read-ahead.so 
|
V
{ client.so client.so client.so} <- distribute.so <- write-behind.so <-

整个io流程还是比较长的,但非常灵活和清晰,你可以很方便的自定义插件插入到整个流程中,或者去除某些节点,只需要更改上面的volume配置即可

整体设计和nginx的模块化思想差不多,nginx的所有模块都保存在一个数组中,所以io顺序是和数组的索引顺序有关,而gluster的所有模块是根据配置串联为一个树,就这里不同

1.2.1 volume配置获取

glusterd服务核心是作为配置管理的服务,当client需要挂载brick时需要通过glusterd获取volume配置,

1
2
3
4
5
6
7
8
9
10
11
12
13
//glusterfs/api/src/glfs.c
int glfs_volumes_init (struct glfs *fs)
{
//省略
if (cmd_args->volfile_server) {
//去glusterd获取卷信息
ret = glfs_mgmt_init (fs);
goto out;
}
//已经知道卷信息了,直接打开
fp = get_volfp (fs);
//..省略
}

  1. gfapi.so: sdk初始化的时候,你可以指定volume配置,也可以走默认的通过glusterd获取卷信息
  2. 手动提供volume配置就是上面将的那个demo,接下来看看通过glusterd获取的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//glusterfs/api/src/glfs-mgmt.c  初始化

int glfs_mgmt_init (struct glfs *fs)
{
//...省略参数初始化

//创建rpc客户端,基本上所有的cli rpc客户端都是一样的逻辑,
//1. socket.so -> connect
//2. request glusterd 查询 volume 信息
rpc = rpc_clnt_new (options, THIS, THIS->name, 8);
if (!rpc) {
ret = -1;
gf_msg (THIS->name, GF_LOG_WARNING, 0,
API_MSG_CREATE_RPC_CLIENT_FAILED,
"failed to create rpc clnt");
goto out;
}
//注册回调函数,然后,socket.so::event_dispatch_handler会一层一层的传送到这里
ret = rpc_clnt_register_notify (rpc, mgmt_rpc_notify, THIS);
if (ret) {
gf_msg (THIS->name, GF_LOG_WARNING, 0,
API_MSG_REG_NOTIFY_FUNC_FAILED,
"failed to register notify function");
goto out;
}
//...省略一些默认逻辑

//开始connect tcp glsterd
ret = rpc_clnt_start (rpc);
out:
return ret;
}

和glusterd的tcp连接成功后,会在单独的event线程里通过epoll_wait拿到事件后回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int
mgmt_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
void *data)
{
//省略无关核心内容

switch (event) {
case RPC_CLNT_DISCONNECT: //断开连接
//省略。。

case RPC_CLNT_CONNECT: //连接成功了
rpc_clnt_set_connected (&((struct rpc_clnt*)ctx->mgmt)->conn);
//向glusterd 发起rpc请求去 查 volfile 信息,在以前的老版本都是自己本地配volfile文件的
//现在的版本都是统一向glusterd去拿
ret = glfs_volfile_fetch (fs);
if (ret && (ctx->active == NULL)) {
break;
default:
break;
}
out:
return 0;
}

通过glfs_volfile_fetch向glusterd发送rpc查询volume配置

1.2.2 volume配置解析

sdk(client)的功能取决于volume的配置组合,接下来看看如何初始化的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int
glfs_process_volfp (struct glfs *fs, FILE *fp)
{
glusterfs_graph_t *graph = NULL;
int ret = -1;
xlator_t *trav = NULL;
glusterfs_ctx_t *ctx = NULL;

ctx = fs->ctx;
graph = glusterfs_graph_construct (fp); //通过yacc,lex 词法工具自动解析内容生成一颗树
if (!graph) {
gf_msg ("glfs", GF_LOG_ERROR, errno,
API_MSG_GRAPH_CONSTRUCT_FAILED,
"failed to construct the graph");
goto out;
}

for (trav = graph->first; trav; trav = trav->next) {
if (strcmp (trav->type, "mount/api") == 0) {
gf_msg ("glfs", GF_LOG_ERROR, EINVAL,
API_MSG_API_XLATOR_ERROR,
"api master xlator cannot be specified "
"in volume file");
goto out;
}
}

ret = glusterfs_graph_prepare (graph, ctx, fs->volname);
if (ret) {
glusterfs_graph_destroy (graph);
goto out;
}

ret = glusterfs_graph_activate (graph, ctx);

if (ret) {
glusterfs_graph_destroy (graph);
goto out;
}

gf_log_dump_graph (fp, graph);

ret = 0;
out:
if (fp)
fclose (fp);

if (!ctx->active) {
ret = -1;
}

return ret;
}

总的来说是通过glusterfs_graph_constructyacc生成的词法解析函数,直接帮你生成了一棵树graph
在这个过程中,就会为每个xaltor去寻找对应的动态库并加载到内存中绑定对应的对方法和变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int xlator_dynload (xlator_t *xl)
{
//设置动态库所在的路径
//比如type protocol/client来说
//name = "/lib64/glusterfs/3.12.15/xlator" + "protocol/client.so"
//加载动态库
handle = dlopen (name, RTLD_NOW|RTLD_GLOBAL);
if (!handle) {
goto out;
}
xl->dlhandle = handle;
//获取fops这个符号的地址,可能是函数也可能是全局变量
if (!(xl->fops = dlsym (handle, "fops"))) {
goto out;
}
//获取cbks回调函数
if (!(xl->cbks = dlsym (handle, "cbks"))) {
goto out;
}

//class_methods是个变量,所以可以直接加载对应的成员实现
vtbl = dlsym(handle,"class_methods");
if (vtbl) {
xl->init = vtbl->init;
xl->fini = vtbl->fini;
xl->reconfigure = vtbl->reconfigure;
xl->notify = vtbl->notify;
}
else {
//忽略初始化init不走
}
//忽略非重要逻辑

//这里比较重要,填充默认函数,设置默认的fop回调函数,fop都是相关文件的操作 create,open,mkdir.touch ....等等
fill_defaults (xl);

return ret;
}

接下来就是挨个遍历整个树,然后初始化各个xlator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int glusterfs_graph_activate (glusterfs_graph_t *graph, glusterfs_ctx_t *ctx)
{
//忽略参数初始化

/* XXX: perform init () */
//调用每个xlator.init() 初始化
ret = glusterfs_graph_init (graph);
if (ret) {
return ret;
}

//忽略无关逻辑。。。

//通知父节点,也就是 mount/api.so 所有的xlator已经初始化完毕
ret = glusterfs_graph_parent_up (graph);
if (ret) {
return ret;
}

return 0;
}

挨个初始化所有的xaltor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int
glusterfs_graph_init (glusterfs_graph_t *graph)
{
xlator_t *trav = NULL;
int ret = -1;

trav = graph->first;
//对每个xlator进行初始化
while (trav) {
ret = xlator_init (trav);
if (ret) {
return ret;
}
trav = trav->next;
}

return 0;
}

最后就是依次调用每个xlator.init()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int
xlator_init (xlator_t *xl)
{
int32_t ret = -1;

GF_VALIDATE_OR_GOTO ("xlator", xl, out);

if (xl->mem_acct_init)
xl->mem_acct_init (xl);

xl->instance_name = NULL;
ret = __xlator_init (xl);
//忽略。。
}

二、xlator

gluster将大量的功能进行模块化,每个模块都是单独的动态库,并通过配置将相关模块关联起来提功完整的功能,和这种架构设计相同的还有熟知的nginx,nginx也是想相关功能都拆分为单独的模块,然后最后将相关模块存放到一个数组中,每个http请求过来都会一次顺序的变量整个数组,层层将处理传递个所有的模块

gluster也是这样,相比nginx,gluster的模块关联要智能一些,它自定义了一种配置语言volume,可以方便的将所有模块关联为一个树,整颗树的的每个节点都能决定改继续传递处理给子节点还是直接终止

虽说整个关联关系是一颗树,但不是一颗完全平衡的多叉树,更像是一个链表

1
2
3
4
5
                                         -> replicate -> client
|
io-stats -> io-thread -> ...-> distribute
|
-> replicate -> client

每个文件的读写流程依次通过各个xlator传递下去,最终到达client节点,通过rpc和glusterfsd(server)提交操作

2.1 xlator实现

接下来分析下xlator的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct _xlator {
/* Built during parsing */
char *name;
char *type;
char *instance_name; /* Used for multi NFSd */
xlator_t *next; //下一个节点
xlator_t *prev; //上一个节点
xlator_list_t *parents; //父节点
xlator_list_t *children; //孩子节点
dict_t *options; //配置

/* Set after doing dlopen() */
void *dlhandle; //动态库
struct xlator_fops *fops; //成员函数
struct xlator_cbks *cbks; //成员回调
struct xlator_dumpops *dumpops;
struct list_head volume_options; /* list of volume_option_t */

void (*fini) (xlator_t *this);
int32_t (*init) (xlator_t *this);
int32_t (*reconfigure) (xlator_t *this, dict_t *options);
int32_t (*mem_acct_init) (xlator_t *this);

void *private;
//忽略非xlator本身字段
};

核心的就是dlhandle,fops,cbks,private

  • dlhandle: 动态库的句柄
  • fops: 动态库里fops变量的地址
  • cbks: 动态库里cbks
  • private: 当前xlator的私有配置

总的来说看起来就像是一个对象类型语言中的class结构,将动态库里的函数绑定到xlator的函数指针成员中,动态绑定,在volume配置通过yacc词法解析器解析的时候就会加载对应的动态库并绑定对应的函数或者成员变量到当前xlator(this)成员中

后续调用当前xlator对应方法如:xlator->fops->write(...)实际调用的是对应动态库fops中的成员函数

2.1.1 xlator兼容性

来看看标准的一个动态库基本结构,以protocol/client.so为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//./xlators/protocol/client/src/client.c
int init (xlator_t *this)
{
}
struct xlator_cbks cbks = {
.forget = client_forget,
.release = client_release,
.releasedir = client_releasedir
};

struct xlator_fops fops = {
.stat = client_stat,
.readlink = client_readlink,
.mknod = client_mknod,
.mkdir = client_mkdir,
.unlink = client_unlink,
.rmdir = client_rmdir,
.symlink = client_symlink,
.rename = client_rename,
.link = client_link,
.truncate = client_truncate,
.open = client_open,
.readv = client_readv,
.writev = client_writev,
//...太多了省略一些

};

struct xlator_dumpops dumpops = {
.priv = client_priv_dump,
.inodectx = client_inodectx_dump,
};

每个xlator都基本要有这些函数和变量,最重要的就是fops变量,里面保存了大量的函数指针,指向了大量的文件接口,如果当前xlator不处理相关操作可以不实现对应的方法,在xlator初始化的时候会为他填充一个默认的方法,默认的方法啥也不做,就是将当前操作继续代理给下一个子类节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int32_t
default_mkdir (
call_frame_t *frame,
xlator_t *this,
loc_t * loc,
mode_t mode,
mode_t umask,
dict_t * xdata)
{
//啥也不干,继续传递给下一个节点
STACK_WIND_TAIL (frame,
FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir,
loc, mode, umask, xdata);
return 0;
}

2.2 核心xlator

接下来分析一下重要的xlator实现,这里只做一个简介,会有专门章节去细细分析每个xlator的具体实现

接下来就按着处理流程顺序把重要的xlator概述一下

2.2.1 io-threads: 线程池

每个glfs客户端都会默认初始化一个线程池,最小线程为1个,所有读写请求都会丢到线程队列里等待多线程消费

1
2
3
#define IOT_MIN_THREADS         1
#define IOT_DEFAULT_THREADS 16
#define IOT_MAX_THREADS 64

2.2.2 read-ahead: 预读

预先读取下一块顺序的内存,提高读取效率,将大量的、零散的读取操作集合成少量的、大一些的读操作,这样,减小了网络和磁盘的负载。page-size 描述了块的大小。page-count 描述了预读块的总数量

2.2.3 write-behind: 合并

就是一个aggregator器,会将大量小的写操作合并起来组合成一个更大的写操作后统一提交到glusterfsd 服务端进行落盘,优化并发写操作

2.2.4 distribute: 分布式hash

所有分布式的逻辑靠这个模块实现,也就是分布式的逻辑是靠客户端实现,当前模块会根据文件名进行hash然后进行匹配落到某一个子卷,后续的操作都在该子卷上操作

  1. 子卷是一个副本,那么会将后续的读写请求提交到这个副本模块(replicate)
  2. 子卷是一个brick,那么会将后续的读写请求提交到client模块

2.2.5 replicate: 副本

这里的逻辑也相对简单,会遍历子节点,子节点必定是protocol/client.so模块,依次将对应的操作提交到子节点去

2.2.6 client: 最后一个模块

最后的节点就是一个rpc客户端,和对应的brick(glusterd)保持长链接,将第一

三、一次write的生命周期

image

其实一次write调用主要分为两个大的操作

    1. lookup: 查找文件是否存在,并基于hash初始化对应文件所在的卷等一些环境初始化
    1. write: 实现文件write

两种操作都是基于xlator整颗树层层传递下去,这里只分析write的过程,两种过程的逻辑差不多

3.1 gfapi.so: sdk入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//glusterfs/api/src/glfs-fops.c
ssize_t pub_glfs_write (struct glfs_fd *glfd, const void *buf, size_t count, int flags)
{
printf("mount/api.so glfs_write: \n");
struct iovec iov = {0, };
ssize_t ret = 0;

iov.iov_base = (void *) buf;
iov.iov_len = count;

ret = pub_glfs_pwritev (glfd, &iov, 1, glfd->offset, flags);

return ret;
}

拿到xlator的root节点,也就是 meta_autoload.so,这个动态库啥也没干,就是个代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ssize_t
pub_glfs_pwritev (struct glfs_fd *glfd, const struct iovec *iovec, int iovcnt,
off_t offset, int flags)
{
//..忽略参数初始化
//meta_autoload xlator
subvol = glfs_active_subvol (glfd->fs);
if (!subvol) {
ret = -1;
errno = EIO;
goto out;
}
//这里比较重要,是一个分支,用于解析这个文件是否存在,并初始化一些环境
fd = glfs_resolve_fd (glfd->fs, subvol, glfd);
if (!fd) {
ret = -1;
errno = EBADFD;
goto out;
}
//准备数据
ret = glfs_buf_copy (subvol, iovec, iovcnt, &iobref, &iobuf, &iov);
if (ret)
goto out;
//开始准备通过xlator树传递操作
ret = syncop_writev (subvol, fd, &iov, 1, offset, iobref, flags, NULL,
NULL);
DECODE_SYNCOP_ERR (ret);
//省略一些收尾操作
}

3.1.1 将write操作通过xlator树传递下去

开始将请求通过xlator这颗树进行传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int
syncop_writev (xlator_t *subvol, fd_t *fd, const struct iovec *vector,
int32_t count, off_t offset, struct iobref *iobref,
uint32_t flags, dict_t *xdata_in, dict_t **xdata_out)
{
struct syncargs args = {0, };

//开始写入数据
//subvol root(meta-autoload) -> vol2 -> .... vol2client
//args参数
//syncop_writev_cbk 回调参数

//其实就是执行 subvol->fops->writev()函数
SYNCOP (subvol, (&args), syncop_writev_cbk, subvol->fops->writev,
fd, (struct iovec *) vector, count, offset, flags, iobref,
xdata_in);

//省略参数收尾
}

subvol是meta_autoload.so,接下来就要调用meta_autoload.fops.writev() 函数,也就是meta_writev函数

3.1.2 阻塞等待整颗树处理完

上文用了宏SYNCOP进行xlaotr函数调用,这个会阻塞等待子节点处理完毕在返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#define SYNCOP(subvol, stb, cbk, op, params ...) do {                   \
struct synctask *task = NULL; \
call_frame_t *frame = NULL; \
\
task = synctask_get (); \
stb->task = task; \
if (task) \
frame = task->opframe; \
else \
frame = syncop_create_frame (THIS); \
\
if (task) { \
frame->root->uid = task->uid; \
frame->root->gid = task->gid; \
} \
\
__yawn (stb); \
\
STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \
op, params); \
\
__yield (stb); \
if (task) \
STACK_RESET (frame->root); \
else \
STACK_DESTROY (frame->root); \
} while (0)

因为客户端逻辑非synctask模式,只有server端菜会走synctask
协程模式,所以客户端需要阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define __yield(args) do {						\
if (args->task) { \
synctask_yield (args->task); \
} else { \
pthread_mutex_lock (&args->mutex); \
{ \
while (!args->done){ \
pthread_cond_wait (&args->cond, \
&args->mutex); \
} \
} \
pthread_mutex_unlock (&args->mutex); \
pthread_mutex_destroy (&args->mutex); \
pthread_cond_destroy (&args->cond); \
} \
} while (0)

直接sleep(mutex+cond),等待异步处理完后唤醒

3.2 meta_autoload.so

meta_autoload.so是在volume配置解析的时候手动加上的,所以在配置里是看不到的,手动加入后是作为root节点

1
2
3
4
volume vol2-meta_autoload
type meta/autoload
subvolumes vol2-io-stats
end-volume

子节点是io-stats.so模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int
meta_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *iov,
int count, off_t offset, uint32_t flags, struct iobref *iobref,
dict_t *xdata)
{
//写入文件
printf("meta_autoload.so meta_writev\n");
//this 是一个xlator,也就是当前的 meta.so 动态库,作为所有xlator的top节点


struct xlator_fops *_fops = NULL;
//这里一般都是获取到默认的meta_fops
//meta_fops 默认的方法都是一个代理,默认代理到xlator的第一个child去执行
_fops = meta_fops_get (fd->inode, this);
//默认方法在 /libglusterfs/src/default.c 中实现
_fops->writev (frame, this, fd,iov,count,offset,flags,iobref,xdata);
return 0;
}

meta模块没有对逻辑做处理,实际是调用default的方法继续传递给下一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//glusterfs/libglusterfs/src/defaults.c
int32_t
default_writev (
call_frame_t *frame,
xlator_t *this,
fd_t * fd,
struct iovec * vector,
int32_t count,
off_t off,
uint32_t flags,
struct iobref * iobref,
dict_t * xdata)
{
//直接调用第一个child节点 xlator->chilid->fops->writev 方法
STACK_WIND_TAIL (frame,
FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
fd, vector, count, off, flags, iobref, xdata);
return 0;
}

FIRST_CHILD(this) = io-stats.so, 接下来就是调用io-stats->fops->writev(io_stats_writev)函数

3.3 io-stats.so

这个就是个debug模块,也是啥都没干,继续传递给下一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//glusterfs/xlators/debug/io-stats/src/io-stats.c
int
io_stats_writev (call_frame_t *frame, xlator_t *this,
fd_t *fd, struct iovec *vector,
int32_t count, off_t offset,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
printf("debug/io-stats.so: io_stats_writev\n");
int len = 0;
//找到fd
if (fd->inode)
frame->local = fd->inode;
//获取数据长度
len = iov_length (vector, count);

ios_bump_write (this, fd, len);
START_FOP_LATENCY (frame);

//开始调用下一个节点
STACK_WIND (frame, io_stats_writev_cbk,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->writev,
fd, vector, count, offset, flags, iobref, xdata);
return 0;

}

FIRST_CHILD(this) == xlators(io-threads.so) ,子节点就是io-threads

接下来就是调用io-threads.so::iot_writev 方法

3.4 io-threads.so 线程池

这边会将操作提交到worker线程异步去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//glusterfs/xlators/performance/io-threads/src/io-threads.c

int
iot_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t offset,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
call_stub_t *__stub = NULL;
int __ret = -1;
//将stub投递到io-thread 线程池中去,真正执行的是 default_writev_resume
__stub = fop_writev_stub(frame, default_writev_resume, fd,vector,count,offset,flags,iobref,xdata);
if (!__stub) {
__ret = -ENOMEM;
goto out;
}

__ret = iot_schedule (frame, this, __stub);

out:
if (__ret < 0) {
default_writev_failure_cbk (frame, -__ret);
if (__stub != NULL) {
call_stub_destroy (__stub);
}
}

return 0;
}

worker接收到任务后开始执行default_writev_resume ,继续调用下一个子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//glusterfs/libglusterfs/src/defaults.c

int32_t default_writev_resume (call_frame_t *frame, xlator_t *this, fd_t * fd,
struct iovec * vector,
int32_t count,
off_t off,
uint32_t flags,
struct iobref * iobref,
dict_t * xdata)
{
//独立线程开始调用 xlator的子节点
STACK_WIND (frame, default_writev_cbk,
FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
fd, vector, count, off, flags, iobref, xdata);
return 0;
}

io-thread节点的子节点依然只有一个open-behind.so, 子节点的方法FIRST_CHILD(this)->fops->writev == ob_writev

3.5 open-behind.so

对于写请求基本上等于啥也没干,就是个代理节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//glusterfs/xlators/performance/open-behind/src/open-behind.c

int ob_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *iov,
int count, off_t offset, uint32_t flags, struct iobref *iobref,
dict_t *xdata)
{
call_stub_t *stub = NULL;
//meta-autoload.so io-thread.so open-behind.so 啥事没干,代理了一下后直接调用default_writev_resume 继续传递给下一个child节点
stub = fop_writev_stub (frame, default_writev_resume, fd, iov, count,
offset, flags, iobref, xdata);
if (!stub)
goto err;
open_and_resume (this, fd, stub);

return 0;
err:
STACK_UNWIND_STRICT (writev, frame, -1, ENOMEM, 0, 0, 0);

return 0;
}

继续代理到子节点去处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//glusterfs/libglusterfs/src/defaults.c
int32_t
default_writev_resume (call_frame_t *frame, xlator_t *this, fd_t * fd,
struct iovec * vector,
int32_t count,
off_t off,
uint32_t flags,
struct iobref * iobref,
dict_t * xdata)
{
//独立线程池开始调用 xlator的子节点
STACK_WIND (frame, default_writev_cbk,
FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
fd, vector, count, off, flags, iobref, xdata);
return 0;
}

子节点是quick-read.so,FIRST_CHILD(this)->Fops->writev == qr_writev()

3.6 quick-read.so 快速读

当前组件相当于一个缓存层,为了加速读的效率,会带有缓存,那么写入的时候必然导致不一致,需要清除缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//glusterfs/xlators/performance/quick-read/src/quick-read.c
int
qr_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *iov,
int count, off_t offset, uint32_t flags, struct iobref *iobref,
dict_t *xdata)
{
printf("performance/quick-read.so: qr_writev\n");
qr_inode_prune (this, fd->inode);
//继续调用下面一层 performance/io-cache.so
STACK_WIND (frame, default_writev_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->writev,
fd, iov, count, offset, flags, iobref, xdata);
return 0;
}

  1. 清理缓存
  2. 将请求继续代理到下一个子节点
  3. 子节点是: readdir-ahead.so

因为readdir-ahead没有实现writev函数,所以用的默认的default函数,所以跳过这里的介绍,直接去下一个节点 distribute.so,对应的方法是dht_writev

3.7 dht_writev: 核心-分布式hash

在这里会决定当前文件写入到那个子卷,对应的哪些brick副本

为了专注写流程的生命周期,distribute.so的核心实现会另起文章分析,这里只需要知道,在调用dht_writev前,有一个lookup过程,就已经缓存好当前文件对应的是哪个子卷(xlator)

所以这里通过local->cached_subvol,直接就拿到了对应的子卷,不用在去临时计算hash分布找出对应的子卷

备注: 上面说的子卷就是replicate.so 或者不是副本的情况下就是 client.so

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
int
dht_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int count, off_t off, uint32_t flags,
struct iobref *iobref, dict_t *xdata)
{
xlator_t *subvol = NULL;
int op_errno = -1;
dht_local_t *local = NULL;

VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (fd, err);
//这里比较奇葩
//this->child 就是protocol/client xlator
//但是没有和其他组件那样直接就去调用this->child->fops->writev() 而是有一个缓存
//从缓存中拿到 subvol 是 哪个protocol/client
local = dht_local_init (frame, NULL, fd, GF_FOP_WRITE);
if (!local) {

op_errno = ENOMEM;
goto err;
}
//缓存的子卷信息
subvol = local->cached_subvol;
if (!subvol) {
gf_msg_debug (this->name, 0,
"no cached subvolume for fd=%p", fd);
op_errno = EINVAL;
goto err;
}

if (xdata)
local->xattr_req = dict_ref (xdata);

local->rebalance.vector = iov_dup (vector, count);
local->rebalance.offset = off;
local->rebalance.count = count;
local->rebalance.flags = flags;
local->rebalance.iobref = iobref_ref (iobref);
local->call_cnt = 1;
//1. 当replica=3 brick = 6 ,说明有两个子卷
//2. 说明当前distribute组件的作用就是随机分布选取一个子卷存储
//3. 子卷里需要循环将文件写入到3个brick节点落盘

//开始调用子卷的writev操作
STACK_WIND_COOKIE (frame, dht_writev_cbk, subvol, subvol,
subvol->fops->writev, fd,
local->rebalance.vector,
local->rebalance.count,
local->rebalance.offset,
local->rebalance.flags,
local->rebalance.iobref, local->xattr_req);

return 0;

err:
op_errno = (op_errno == -1) ? errno : op_errno;
DHT_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);

return 0;
}

因为我们当前的测试是一个三副本,所以下一个节点是一个replicate.so子卷,对应的方法是afr_writev

3.8 replicate.so: 副本机制

因为我们当前的测试时基于三副本,所以当前的replicate的子节点有3个client.so,分别对应连接这3个glusterfsd(brick) 进程

当需要写入文件时,需要同时写入到3个client.so,最终落盘到三个brick

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//glusterfs/xlators/cluster/afr/src/afr-inode-write.c
int
afr_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t offset,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
//忽略参数的组装

local->append_write = _gf_true;

local->stable_write = !!((fd->flags|flags)&(O_SYNC|O_DSYNC));

afr_fix_open (fd, this);

afr_do_writev (frame, this);

return 0;
out:
AFR_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);

return 0;
}

3.8.1 开启事务,提交rpc数据到client.so

开始遍历三个client.so,将数据通过rpc,提交到三个glsuterfsd server去,最终落盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//glusterfs/xlators/cluster/afr/src/afr-inode-write.c
int
afr_do_writev (call_frame_t *frame, xlator_t *this)
{
//忽略参数初始化

local->op = GF_FOP_WRITE;

local->transaction.wind = afr_writev_wind;
local->transaction.fop = __afr_txn_write_fop;
local->transaction.done = __afr_txn_write_done;
local->transaction.unwind = afr_transaction_writev_unwind;

//忽略参数准备
//开始事务
ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
//忽略错误处理,
}

int
__afr_txn_write_fop (call_frame_t *frame, xlator_t *this)
{
//忽略参数初始化

//开始遍历三个client.so,写入数据
local->call_count = call_count;
for (i = 0; i < priv->child_count; i++) {
if (local->transaction.pre_op[i] && !failed_subvols[i]) {
local->transaction.wind (frame, this, i);

if (!--call_count)
break;
}
}

return 0;
}

3.9 client.so: rpc client

找到对应的rpc方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int32_t
client_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t off,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
printf("protocol/client.so: client_writev\n");
int ret = -1;
clnt_conf_t *conf = NULL;
rpc_clnt_procedure_t *proc = NULL;
clnt_args_t args = {0,};
//私有配置
conf = this->private;
if (!conf || !conf->fops)
goto out;

args.fd = fd;
args.vector = vector;
args.count = count;
args.offset = off;
args.size = iov_length (vector, count);
args.flags = flags;
args.iobref = iobref;
args.xdata = xdata;

client_filter_o_direct (conf, &args.flags);
//拿到rpc wirte的 路由信息
proc = &conf->fops->proctable[GF_FOP_WRITE];
if (proc->fn)
ret = proc->fn (frame, this, &args);
out:
if (ret)
STACK_UNWIND_STRICT (writev, frame, -1, ENOTCONN, NULL, NULL, NULL);

return 0;
}

3.9.1 提交rpc数据到glusterfsd

核心rpc处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

int32_t
client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
{
clnt_args_t *args = NULL;
clnt_conf_t *conf = NULL;
gfs3_write_req req = {{0,},};
int op_errno = ESTALE;
int ret = 0;

if (!frame || !this || !data)
goto unwind;

args = data;
conf = this->private;
//预处理 ,准备写入的数据包
ret = client_pre_writev (this, &req, args->fd, args->size,
args->offset, args->flags, &args->xdata);

if (ret) {
op_errno = -ret;
goto unwind;
}

ret = client_fd_fop_prepare_local (frame, args->fd, req.fd);
if (ret) {
op_errno = -ret;
goto unwind;
}
//开始发送写入数据的请求 到远程 glusterd rpc server处理
ret = client_submit_vec_request (this, &req, frame, conf->fops,
GFS3_OP_WRITE, client3_3_writev_cbk,
args->vector, args->count,
args->iobref,
(xdrproc_t)xdr_gfs3_write_req);
if (ret) {
/*
* If the lower layers fail to submit a request, they'll also
* do the unwind for us (see rpc_clnt_submit), so don't unwind
* here in such cases.
*/
gf_msg (this->name, GF_LOG_WARNING, 0, PC_MSG_FOP_SEND_FAILED,
"failed to send the fop");
}
//发送到远程后 释放客户端内存
GF_FREE (req.xdata.xdata_val);

return 0;

unwind:
CLIENT_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
GF_FREE (req.xdata.xdata_val);

return 0;
}

四、概述

到这里总共分析了有

  1. volume配置语言: 基于lex,bison简单的实现了一个类xml的配置
  2. volume配置中每个节点对应的xlator作用
  3. sdk整体逻辑需要依赖volume配置的依赖关系
  4. 最终通过一个write的io流程讲解整个client端的主功能

像nginx一样,如果你要自定义一个插件介入到api的生命周期中,你只需要自己实现一个xlator,然后加入到配置中即可

总结出核心的副本,hash,分布式整个逻辑都是放到client端的,server端(glusterfsd)基本上只负责将文件高效的落盘到linux文件系统对应路径上

当然当前文章只进行了整体的概括,细节需要单独的文章分析,比如ddistribute.so核心实现,event调度器实现,server端的协程调度器实现。。等等后续在单独写文章分析了