第3章


公共组件源码解析

在数据库组件中,一些组件是专用的,如词法解析只用于SQL引擎,而另外一些组件是公共的,用于整个数据库系统。openGauss的公共组件包括系统表、数据库初始化、多线程架构、线程池、内存管理、多维监控和模拟信号机制等,每个组件实现一个独立的功能。本章对公共组件的源代码实现进行介绍。


3.1系统表



系统表又称为数据字典或者元数据,存储管理数据库对象的定义信息,如表、索引、触发器等。用户可通过系统表查询用户定义的具体对象信息,如表的每个字段类型。因为openGauss支持一个实例管理多个数据库,所以系统表分为实例级别的系统表和数据库级别的系统表。实例级别的系统表在一个实例管理的多个数据库之间共享,整个实例只有一份,这些系统表为pg_authid、pg_auth_members、pg_database

、pg_db_role_setting、pg_tablespace、pg_shdepend、pg_shdescription、pg_shseclabel。数据库级别的系统表如pg_class、pg_depend、pg_index、pg_attribute等,每个数据库各有一份。
3.1.1系统表的定义
openGauss系统表定义全部在src/include/catalog目录下,每个头文件就是一个系统表的定义,如pg_database.h就是对pg_database

的定义。在pg_database.h中,主要包括pg_database的表OID(Object Identifier,对象标识符)、类型OID、结构体定义、字段个数和每个字段ID(Identifier,标识符)枚举值、数据库初始化默认值。下面是代码及其具体解释: 


/*pg_database本身也是一张表,DatabaseRelationId表示pg_database在系统表pg_class中的OID为1262(pg_class系统表中保存的是表的定义信息)*/ 

#define DatabaseRelationId    1262

/*pg_database本身也是一个结构类型,DatabaseRelation_Rowtype_Id表示pg_database在系统表pg_type中的OID为1248*/ 

#define DatabaseRelation_Rowtype_Id  1248

/*BKI_SHARED_RELATION表示pg_database是实例级别的系统表*/

CATALOG(pg_database,1262) BKI_SHARED_RELATION BKI_ROWTYPE_OID(1248) BKI_SCHEMA_MACRO

{

NameDatadatname;              /*数据库名称*/

Oid         datdba;               /*数据库拥有者*/

int4        encoding;             /*字符集编码*/

NameData    datcollate;           /*LC_COLLATE 设置值*/

NameData    datctype;             /*LC_CTYPE 设置值*/

bool        datistemplate;        /*是否允许作为模板数据库*/

bool        datallowconn;         /*是否允许连接*/

int4        datconnlimit;         /*最大连接数*/

Oid         datlastsysoid;        /*系统OID最大值*/

ShortTransactionId datfrozenxid;  /*冻结事务ID,所有小于这个值的事务ID已经冷冻。为了兼容原来的版本,使用32位事务ID*/

Oid         dattablespace;     /*数据库的默认表空间*/

NameData    datcompatibility;   /*数据库兼容模式,比如除0是报错还是当作正常处理*/

#ifdef CATALOG_VARLEN          /*下面字段是变长字段*/

aclitem        datacl[1];        /*访问权限*/

#endif

TransactionId datfrozenxid64;/*冷冻事务ID,64-bit(比特)事务ID*/

} FormData_pg_database;

openGauss数据库源码解析

第3章公共组件源码解析

CATALOG的宏定义代码为


#define CATALOG(name,oid)typedef struct CppConcat(FormData_,name)

因此,CATALOG(pg_database,1262)就是对结构体FormData_pg_database的定义。之所以采用CATALOG,是因为这个格式是和BKI(Backend Interface,后端接口)脚本约定的格式,BKI脚本根据这个格式生成数据库的建表脚本。
接下来是数据库对象字段总数和每个字段ID的值的定义代码,定义这些值的目的主要是代码访问数据库对象时清晰,方便维护,避免魔鬼数字(魔鬼数字指在代码中没有具体含义的数字、字符串。魔鬼数字会影响代码可读性,使读者无法理解看到的数字对应的含义,从而难以理解程序的意图)。


#define Natts_pg_database                14

#define Anum_pg_database_datname        1

#define Anum_pg_database_datdba            2

#define Anum_pg_database_encoding        3

#define Anum_pg_database_datcollate        4

#define Anum_pg_database_datctype        5

#define Anum_pg_database_datistemplate    6

#define Anum_pg_database_datallowconn    7

#define Anum_pg_database_datconnlimit    8

#define Anum_pg_database_datlastsysoid    9

#define Anum_pg_database_datfrozenxid    10

#define Anum_pg_database_dattablespace    11

#define Anum_pg_database_compatibility    12

#define Anum_pg_database_datacl            13

#define Anum_pg_database_datfrozenxid64    14

最后是创建数据库时的默认数值。代码中的值表示默认创建template1数据库,DATA的字段值和数据库结构体的值一一对应,对应代码如下: 


DATA(insert OID = 1 (  template1 PGUID ENCODING "LC_COLLATE" "LC_CTYPE" t t -1 0 0 1663 "DB_COMPATIBILITY" _null_ 3));

SHDESCR("default template for new databases");

#define TemplateDbOid            1

#define DEFAULT_DATABASE  "postgres"

系统表头文件的内容和格式基本类似。
3.1.2系统表的访问
定义系统表后,接下来介绍数据库在运行过程中对系统表的访问。openGauss对系统表的访问主要通过syscache机制。syscache机制是一个通用的机制,主要对系统表的数据进行缓存,提升系统表数据的访问速度,详细细节参照具体章节描述。这里主要介绍与pg_database相关的部分。pg_database在枚举类型enum SysCacheIdentifier中定义的枚举值有一个: DATABASEOID,表示根据数据库OID访问pg_database系统表,同时需要把pg_database系统表访问模式添加到“struct cachedesc cacheinfo”中。与pg_database相关的代码如下: 


{DatabaseRelationId,/*DATABASEOID*/

DatabaseOidIndexId,

1,

{ObjectIdAttributeNumber,0,0,0},

4},

这几个值与cachedesc结构体的字段对应,表示pg_database表的OID值、索引的OID值、搜索时有1个key字段、搜索key字段ID为ObjectIdAttributeNumber、初始化为4个哈希桶。相关的代码如下: 


struct cachedesc {

Oid reloid;   /*缓存的表的OID*/

Oid indoid;   /*缓存数据的索引OID*/

int nkeys;    /*缓存搜索的key的个数*/

int key[4];   /*key属性的编号*/

int nbuckets; /*缓存哈希桶的个数*/

};

系统表的定义和访问主要逻辑如上所述。与pg_database相关的SQL命令是ALTER DATABASE、CREATE DATABASE、DROP DATABASE,这些命令执行的结果是把数据库相关的信息存储到pg_database系统表中。
其他系统表的逻辑与pg_database相似,不再重复。


3.2数据库初始化



数据库正常启动时需要指定数据目录,数据目录中包含了系统表的初始化数据。数据库初始化的过程会生成这些初始系统表数据文件,该过程由initdb和openGauss进程配合生成。initdb控制执行过程,创建目录和基本的配置文件; openGauss进程负责系统表的初始化。initdb通过PG_CMD_OPEN宏启动openGauss进程,同时打开一个管道流,然后通过解析系统表文件中的SQL命令,并把命令通过PG_CMD_PUTS宏的管道流发给openGauss进程,最后通过PG_CMD_CLOSE宏关闭管道流。PG_CMD_OPEN宏是系统函数popen的封装宏,PG_CMD_PUTS宏是系统函数fputs的封装宏,PG_CMD_CLOSE宏是系统函数pclose的封装宏。初始化交互过程如图31所示。


图31初始化交互过程


initdb在创建template1模板数据库时,命令参数指定了“snprintf_s(cmd,sizeof(cmd),sizeof(cmd)1,"\"%s\" boot x1 %s%s",backend_exec,boot_options,talkargs);”,其中“boot”表示openGauss进程以一个特殊的bootstrap模式运行。在其他初始化系统表时,initdb命令参数指定了“snprintf_s(cmd,sizeof(cmd),sizeof(cmd) 1,"\"%s\" %s template1 >%s",backend_exec,backend_options,DEVNULL);  ”,“static const char* backend_options = "single "”表示openGauss进程以单用户模式运行。
下面以setup_schema函数为例详细介绍这个过程,相关代码如下: 


static void setup_schema(void)

{

PG_CMD_DECL;

char** line;

char** lines;

int nRet = 0;

char* buf_features = NULL;

fputs(_("creating information schema ... "),stdout);

(void)fflush(stdout);

lines = readfile(info_schema_file);

/*

* 使用-j 避免在information_schema.sql反斜杠处理

*/

nRet = snprintf_s(

cmd,sizeof(cmd),sizeof(cmd) - 1,"\"%s\" %s -j template1 >%s",backend_exec,backend_options,DEVNULL);

securec_check_ss_c(nRet,"\0","\0");

PG_CMD_OPEN;

for (line = lines; *line != NULL; line++) {

PG_CMD_PUTS(*line);

FREE_AND_RESET(*line);

}

FREE_AND_RESET(lines);

PG_CMD_CLOSE;

nRet = snprintf_s(

cmd,sizeof(cmd),sizeof(cmd) - 1,"\"%s\" %s template1 >%s",backend_exec,backend_options,DEVNULL);

securec_check_ss_c(nRet,"\0","\0");

PG_CMD_OPEN;

PG_CMD_PRINTF1("UPDATE information_schema.sql_implementation_info "

"  SET character_value = '%s' "

"  WHERE implementation_info_name = 'DBMS VERSION';\n",

infoversion);



buf_features = escape_quotes(features_file);

PG_CMD_PRINTF1("COPY information_schema.sql_features "

"  (feature_id,feature_name,sub_feature_id,"

"  sub_feature_name,is_supported,comments) "

" FROM E'%s';\n",

buf_features);

FREE_AND_RESET(buf_features);

PG_CMD_CLOSE;

check_ok();

}

在这个函数中,PG_CMD_DECL是一个变量定义宏,通过语句“char cmd[MAXPGPATH]”和“FILE* cmdfd = NULL”定义了两个变量。这样的作用是代码格式统一,阅读方便。
语句“readfile(info_schema_file)”表示读取info_schema_file文件,这个文件中存放了系统表初始化的SQL命令。
语句“snprintf_s(cmd,sizeof(cmd),sizeof(cmd)1,"\"%s\" %s j template1 >%s",backend_exec,backend_options,DEVNULL)”是格式化openGauss后台进程的命令。语句“PG_CMD_OPEN”是以popen的方式运行cmd命令,启动openGauss进程。
语句“for (line = lines;  *line != NULL;  line++)”表示遍历info_schema_file文件中的每条SQL命令,宏PG_CMD_PUTS把每个SQL命令发送给openGauss进程执行。
整个文件执行完毕,调用宏PG_CMD_CLOSE停止进程,关闭管道。setup_schema函数的后面代码处理是类似的,只是SQL命令是函数内生成的,使用宏PG_CMD_PRINTF1写入管道,发给openGauss进程。
setup_sysviews、setup_dictionary、setup_privileges等其他系统对象初始化函数过程都是类似的,不再重复描述。
initdb的整个初始化过程如下。
(1)  对命令行参数进行解析。
(2)  查找openGauss程序,设置$PGDATA、$PGPATH等环境变量。设置数据库初始化原始文件,这些文件在shell命令make install执行安装后,默认都在“openGaussserver/dest/share/postgresql”目录下。
(3)  数据库本地初始化,locale默认初始化为en_US.UTF8,数据库编码默认初始化为UTF8,文本搜索默认初始化为English。
(4)  检查数据库数据目录pg_data是否为空,是否需要创建,权限是否正确。
(5)  创建subdirs变量指定的子目录。
(6)  初始化conf配置文件。
(7)  创建template1数据库bootstrap_template1。这一步需要启动后台openGauss进程执行数据库的SQL语句,创建系统表。bootstrap_template1这个函数主要是读取bki文件中的SQL语句,发送到openGauss进程去执行,主要功能是创建系统表。语句create pg_type表示创建pg_type系统表,语句INSERT OID表示插入这个系统表的默认数据。这里的语法是专门为initdb定制的bootstrap解析语法,不是正式的SQL语法,语法文件也是单独的,可参照“openGaussserver\src\gausskernel\bootstrap”目录下的bootscanner.l和bootparse.y文件。pg_type系统对象在initdb初始化中的bootstrap语法相关代码如下,在初始化时就是解析下面语法格式完成pg_type系统对象的创建。


create pg_type 1247 bootstrap rowtype_oid 71

(

typname = name ,

typnamespace = oid ,

typowner = oid ,

typlen = int2 ,

typbyval = bool ,

typtype = char ,

typcategory = char ,

typispreferred = bool ,

typisdefined = bool ,

typdelim = char ,

typrelid = oid ,

typelem = oid ,

typarray = oid ,

typinput = regproc ,

typoutput = regproc ,

typreceive = regproc ,

typsend = regproc ,

typmodin = regproc ,

typmodout = regproc ,

typanalyze = regproc ,

typalign = char ,

typstorage = char ,

typnotnull = bool ,

typbasetype = oid ,

typtypmod = int4 ,

typndims = int4 ,

typcollation = oid ,

typdefaultbin = pg_node_tree ,

typdefault = text ,

typacl = aclitem[]

)

INSERT OID = 16 ( bool 11 10 1 t b B t t \054 0 0 1000 boolin boolout boolrecv boolsend --- c p f 0 -1 0 0 _null_ _null_ _null_ )

INSERT OID = 17 ( bytea 11 10 -1 f b U f t \054 0 0 1001 byteain byteaout bytearecv byteasend --- i x f 0 -1 0 0 _null_ _null_ _null_ )

…

close pg_type

(8) 使用setup_auth函数初始化pg_authid权限。该函数执行的SQL语句是在函数内静态定义“static const char* pg_authid_setup[]”。
(9) 使用setup_depend函数创建系统表依赖关系。该函数执行的SQL语句是在函数内静态定义“static const char* pg_depend_setup[]”。
(10) 使用load_plpgsql函数加载plpgsql扩展组件。该函数只执行一条SQL语句“CREATE EXTENSION plpgsql;”。
(11)  使用setup_sysviews函数创建系统视图。该函数会读取system_views.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建系统视图。
(12)  使用setup_perfviews函数创建性能视图。该函数会读取performance_views.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建性能视图。
(13)  使用setup_conversion函数创建编码转换。该函数会读取conversion_create.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建编码转换函数。
(14)  使用setup_dictionary函数创建词干数据字典。该函数会读取snowball_create.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建文本搜索函数。
(15)  使用setup_privileges函数设置权限。setup_privileges函数通过xstrdu复制SQL常量字符串到一个动态数组内,然后遍历执行指定的SQL语句。
(16)  使用load_supported_extension函数加载外表。该函数执行相应扩展组件的CREATE EXTENSION语句。
(17)  使用setup_update函数更新系统表。该函数执行语句COPY pg_cast_oid.txt到数据库中,主要功能是创建类型强制转换处理函数。
(18)  对template1进行垃圾数据清理,即执行三个SQL语句“ANALYZE;”“VACUUM FULL;”“VACUUM FREEZE;”。
(19)  创建template0数据库,即复制template1到template0。
(20)  创建默认数据库,即复制template1到默认数据库。
(21)  对template0、template1、默认数据库进行垃圾数据清理和事务ID冻结。


3.3多线程架构



openGauss内核源自PostgreSQL,但在架构上进行了大量改造,其中一个调整就是将多进程架构修改为多线程架构。openGauss在启动后只有一个进程,后台任务都是以一个进程中的线程来运行的。对于客户端的新连接,在非线程池模式下也是以启动一个业务线程来处理的。在多线程架构下更容易实现多个线程资源的共享,如并行查询、线程池等。
3.3.1openGauss主要线程
openGauss的后台线程是不对等的,其中Postmaster是主线程,其他线程都是它创建出来的。openGauss后台线程的功能介绍如表31所示。


表31openGauss后台线程的功能


后台线程
功能介绍
Postmaster
openGauss数据库主线程。主要有两个功能: 一是对连接进行监听,接收新的连接; 二是监控所有子线程的状态,并根据子线程退出状态进行处理,如果线程是FATAL退出,则重新拉起子线程,如果线程是PANIC退出,则进行整个数据库重新初始化,保证数据库的正常运行
Startup
数据库启动线程。数据库启动时Postmaster主线程拉起的第一个子线程,主要完成数据库的日志REDO(重做)操作,进行数据库的恢复。日志REDO操作结束,数据库完成恢复后,如果不是备机,Startup线程就退出了。如果是备机,那么Startup线程一直在运行,REDO备机接收到新的日志
Bgwriter
后台数据写线程。周期性地把数据库数据缓冲区的内容同步到磁盘上
Checkpointer
检查点线程。进行检查点操作,完成数据库的周期性检查点和执行检查点命令
Walwriter
后台WAL写线程。主要功能是周期性地把日志缓冲区的内容同步到磁盘上
Stat
数据库运行信息统计收集线程。主要功能是收集各个线程操作数据库的统计信息,进行汇总后写入数据库的统计文件中,供查询优化分析和垃圾清理使用
Sysloger
运行日志写线程。主要功能是把各个线程的运行日志信息写到运行日志文件中
Vacuum Launch
垃圾清理启动线程。主要有两个功能: 一是通知Postmaster 启动一个垃圾清理线程; 二是平衡各个垃圾清理线程的负载
Vacuum worker
垃圾清理线程。主要功能是对openGauss的垃圾数据进行清理
Arch
日志归档线程。主要功能是完成归档操作,把在线日志复制到归档目录
Postgres
服务线程。在非线程池模式下,每个客户端连接对应一个服务线程,主要功能是接收客户端的操作请求,代表客户端在服务器完成数据库操作

3.3.2线程间通信
openGauss后台线程之间紧密配合,共同完成数据库的数据处理任务。这些后台线程之间需要交换信息来协调彼此的行为。openGauss多线程通信使用了原来的PostgreSQL的多线程通信方式,具体如表32所示。


表32多线程通信方式


通信方式
说明
共享内存
在数据库初始化时,Postmaster线程通过OS(操作系统)申请一块大的共享内存,并完成初始化工作。openGauss使用到的所有共享内存都是这块内存的一部分。线程之间的一些信息交换就是通过共享内存完成的,共享内存的访问需要加锁保护
续表


通信方式
说明
信号
对于一些紧急任务的处理,openGauss使用信号通知作为线程间通信的手段,因为信号可以中断处理线程当前的任务,立即响应信号对应的任务
TCP
客户端连接数据库服务器时,一般使用TCP进行通信
UNIX域套接字协议
如果是本地客户端,即客户端和服务器在同一台机器上,并且是UNIX操作系统,可以使用UNIX域套接字协议建立客户端和服务器进程的通信
UDP
UDP(User Datagram Protocol,用户数据报协议)是不可靠协议,主要用于后台线程向统计线程发送统计信息时使用
管道
管道可以是双向的,也可以是单向的。在openGauss中,主要使用了单向管道,用在后台线程向运行日志守护线程发送运行日志信息时使用
文件
主要用于一些不太重要的场合,并且通信量比较大。在openGauss中,主要用在统计线程汇总统计信息,写到统计文件,供垃圾清理线程和后台服务器线程成本优化使用
全局变量
一种线程间共享信息的机制。openGauss对原来的PostgreSQL中进程内的全局变量添加THR_LOCAL定义为线程的局部变量,避免线程之间误用

3.3.3线程初始化流程
下面介绍线程的初始化流程。首先介绍openGauss进程的启动。openGauss进程的主函数入口在“\openGaussserver\src\gausskernel\process\main\main.cpp”文件中。在main.cpp文件中,主要完成实例Context(上下文)的初始化、本地化设置,根据main.cpp文件的入口参数调用BootStrapProcessMain函数、GucInfoMain函数、PostgresMain函数和PostmasterMain函数。BootStrapProcessMain函数和PostgresMain函数是在initdb场景下初始化数据库使用的。GucInfoMain函数的作用是显示GUC(Grand Unified Configuration,配置参数,在数据库中指的是运行参数)参数信息。正常的数据库启动会进入PostmasterMain函数。下面对这几个函数进行更详细的介绍。
(1)  进行Postmaster的Context初始化,初始化GUC参数,解析命令行参数。
(2)  调用StreamServerPort函数启动服务器监听和双机监听(如果配置了双机),调用reset_shared函数初始化共享内存和LWLock锁,调用gs_signal_monitor_startup函数注册信号处理线程,调用InitPostmasterDeathWatchHandle函数注册Postmaster死亡监控管道,把openGauss进程信息写入pid_file文件中,调用gspqsignal函数注册Postmaster的信号处理函数。
(3)  根据配置初始化黑匣子,调用pgstat_init函数初始化统计信息传递使用的UDP套接字通信,调用InitializeWorkloadManager函数初始化负载管理器,调用InitUniqueSQL函数初始化UniqueSQL,调用SysLogger_Start函数初始化运行日志的通信管道和SYSLOGGER线程,调用load_hba函数加载hba鉴权文件。
(4)  调用initialize_util_thread函数启动STARTUP线程,调用ServerLoop函数进入一个周期循环。在ServerLoop函数的周期循环中,进行客户端请求监听,如果有客户端连接请求,在非线程池模式下,则调用BackendStartup函数创建一个后台线程worker处理客户请求。在线程池模式下,把新的链接加入一个线程池组中。在ServerLoop函数的周期循环中,检查其他线程的运行状态。如果数据库是第一次启动,则调用initialize_util_thread函数启动其他后台线程。如果有后台线程FATAL级别错误退出,则调用initialize_util_thread函数重新启动该线程,如果是PANIC级别错误退出,则整个实例进行重新初始化。
PostmasterMain完成了线程之间的通信初始化和线程的启动,无论是后台线程的启动函数initialize_util_thread还是工作线程的启动函数initialize_worker_thread,最后都是调用initialize_thread函数完成的线程启动。下面对initialize_thread函数进行介绍。
initialize_thread函数调用gs_thread_create函数创建线程,调用InternalThreadFunc函数处理线程,它的相关代码如下所示: 


ThreadId initialize_thread(ThreadArg* thr_argv)

{

gs_thread_t thread;

if (0 != gs_thread_create(&thread,InternalThreadFunc,1,(void*)thr_argv)) {

gs_thread_release_args_slot(thr_argv);

return InvalidTid;

}

return gs_thread_id(thread);

}

InternalThreadFunc函数的代码如下。该函数根据角色调用GetThreadEntry函数,GetThreadEntry函数直接以角色为下标,返回对应GaussdbThreadEntryGate数组对应的元素。数组的元素是处理具体任务的回调函数指针,指针指向的函数为GaussDbThreadMain。相关代码如下: 


static void* InternalThreadFunc(void* args)

{

knl_thread_arg* thr_argv = (knl_thread_arg*)args;

gs_thread_exit((GetThreadEntry(thr_argv->role))(thr_argv));

return (void*)NULL;

}

GaussdbThreadEntry GetThreadEntry(knl_thread_role role)

{

Assert(role > MASTER && role < THREAD_ENTRY_BOUND);

return GaussdbThreadEntryGate[role];

}

static GaussdbThreadEntry GaussdbThreadEntryGate[] = {GaussDbThreadMain<MASTER>,

GaussDbThreadMain<WORKER>,

GaussDbThreadMain<THREADPOOL_WORKER>,

GaussDbThreadMain<THREADPOOL_LISTENER>,

…};

在GaussDbThreadMain函数中,首先初始化线程基本信息、Context和信号处理函数,然后根据thread_role角色的不同调用不同角色的处理函数,进入各个线程的MAIN函数,如GaussDbAuxiliaryThreadMain函数、AutoVacLauncherMain函数、WLMProcessThreadMain函数等。其中,GaussDbAuxiliaryThreadMain函数是后台辅助线程处理函数。该函数的处理也类似GaussDbThreadMain函数,根据thread_role角色的不同调用不同角色的处理函数,进入各个线程的MAIN函数,如StartupProcessMain函数、CheckpointerMain函数、WalWriterMain函数、walrcvWriterMain函数等。
总结上面整个过程,openGauss多线程架构主要包括3个方面: 
(1)  多线程之间的通信,由主线程在初始化阶段完成; 
(2)  多线程的启动,由主线程创建各个角色线程,调用不同角色的处理函数完成; 
(3)  主线程负责监控各个线程的运行、异常退出和重新拉起。


3.4线程池技术



openGauss在多线程架构的基础上实现了线程池。线程池机制实现了会话和处理线程分离,在大并发连接的情况下仍然能够保证系统有很好的SLA响应。另外,不同的线程组可绑到不同的NUMA(NonUniform Memory Access,非一致性内存访问)核上,天然匹配NUMA化的CPU架构,从而提升openGauss的整体性能。
3.4.1线程池原理
openGauss线程池原理如图32所示,图32中的主要对象如表33所示。


图32线程池原理




表33线程池对象


对象
说明
Postmaster
主线程。负责监听客户端发出的请求
ThreadPoolControler
线程池总控。负责线程池的初始化和资源管理
ThreadSessionControler
会话生命周期管理
ThreadPoolGroup
线程组。可以定义灵活的线程数量和绑核策略
ThreadPoolListener
线程组监听线程。负责事件的分发和管理
ThreadPoolWorker
工作线程
session
客户端连接的一个会话
NUMA NODE
NUMA节点。表示一个线程组在NUMA结构下可以映射到一个NUMA节点上

这些对象相互配合实现了线程池机制,它们的主要交互过程如下。
(1)  客户端向数据库发起连接请求,Postmaster线程接收到连接请求并被唤醒。Postmaster线程创建该连接对应的socket(套接字,用于描述IP地址和端口,是一个通信链的句柄),调用ThreadPoolControler函数创建会话(session)数据结构。ThreadPoolControler函数遍历当前所有的Thread Group(线程组),找到当前活跃会话数量最少的Thread Group,并把最新的会话分发给该Thread Group,加入该Thread Group的epoll(Linux内核为处理大批量句柄而改进的poll(轮询),能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率)列表中。
(2)  Thread Group的listener线程负责监听epoll列表中所有的客户连接。
(3)  客户端发起任务请求,listener线程被唤醒。listener线程检查当前的Thread Group是否有空闲worker线程; 如果有,则把当前会话分配给该worker线程,并唤醒该worker线程; 如果没有,则把该会话放在等待队列中。
(4)  worker线程被唤醒后,读取客户端连接上的请求,执行相应请求,并返回请求结果。在一次事务结束(提交、回滚)或者事务超时退出时,worker线程的一次任务完成。worker线程将会话返回listener线程,listener线程继续等待该会话的下一次请求。worker线程返还会话后,检查会话等待队列; 如果存在等待响应请求的会话,则直接从该队列中取出新的会话并继续工作; 如果没有等待响应的会话,则将自身标记为free(空闲)状态,等待listener线程唤醒。
(5)  客户端断开连接时,worker线程被唤醒,关闭连接,同时清理会话相关结构,释放内存和fd(文件句柄)等资源。
(6)  如果worker线程FATAL级别错误退出,退出时worker线程会从worker队列中注销掉。此时listener线程会重新启动一个新的worker线程,直到达到指定数量的worker线程。
3.4.2线程池实现
线程池功能由GUC参数enable_thread_pool控制,该变量设置为true时才能使用线程池功能。代码主要在“openGaussserver/src/gausskernel/process/threadpool”目录中,下面介绍主要代码实现流程。
Postmaster线程在ServerLoop中判断如果启用了线程池功能,则会调用“ThreadPoolControler::Init”函数进行线程池的初始化。在线程池初始化时,会判断NUMA节点的个数进行NUMA结构处理,相关代码如下: 


if (threadPoolActivated) {

bool enableNumaDistribute = (g_instance.shmem_cxt.numaNodeNum > 1);

g_threadPoolControler->Init(enableNumaDistribute);

}

“ThreadPoolControler::Init”函数的主要作用是创建m_sessCtrl成员和m_groups成员对象,根据绑核策略分配线程个数,调用“ThreadPoolGroup::init”函数进行线程组的初始化,调用“ThreadPoolGroup::WaitReady”函数等待各个线程组初始化结束。创建m_scheduler成员对象,并且调用“ThreadPoolScheduler::StartUp”函数启动线程池调度线程。在“ThreadPoolGroup::init”函数中,创建m_listener对象,启动listener线程。为ThreadWorkerSentry函数分配内存,初始化每个worker的互斥量和条件变量。调用“ThreadPoolGroup::AddWorker”函数创建worker对象,启动worker线程。
Postmaster线程在ServerLoop中如果监听到有客户端链接请求,判断启用了线程池功能,则会调用“ThreadPoolControler::DispatchSession”函数进行会话分发,相关代码如下: 


if (threadPoolActivated &&!(i < MAXLISTEN && t_thrd.postmaster_cxt.listen_sock_type[i] == HA_LISTEN_SOCKET))

result = g_threadPoolControler->DispatchSession(port);

/*ThreadPoolControler::DispatchSession的代码实现如下,找到一个会话数最少的线程组,创建会话,把会话添加到线程组的监听线程中*/ 

int ThreadPoolControler::DispatchSession(Port* port)

{

ThreadPoolGroup* grp = NULL;

knl_session_context* sc = NULL;

grp = FindThreadGroupWithLeastSession();

if (grp == NULL) {

Assert(false);

return STATUS_ERROR;

}

sc = m_sessCtrl->CreateSession(port);

if (sc == NULL)

return STATUS_ERROR;

grp->GetListener()->AddNewSession(sc);

return STATUS_OK;

}

listener线程的主函数为“TpoolListenerMain(ThreadPoolListener* listener)”。在该函数中设置线程的名字和信号处理函数,创建epoll等待事件,通知Postmaster线程已经准备好,调用t_pool_listener_loop函数(其实是调用“ThreadPoolListener::WaitTask”函数进入等待事件状态)。如果有事件到来,调用“ThreadPoolListener::HandleConnEvent”函数找到事件对应的会话。调用“ThreadPoolListener::DispatchSession”函数,如果有空闲的worker线程,通知worker线程进行处理; 如果没有空闲的worker线程,则把会话挂到等待队列中。
worker线程的主函数就是正常的SQL处理函数PostgresMain,与非线程模式相比,主要多了3处处理: 
(1)  worker线程准备就绪的通知; 
(2)  等待会话通知; 
(3)  连接退出处理; 
worker线程的相关代码如下: 


if (IS_THREAD_POOL_WORKER) {

u_sess->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET;

t_thrd.threadpool_cxt.worker->NotifyReady();

}

if (IS_THREAD_POOL_WORKER) {

t_thrd.threadpool_cxt.worker->WaitMission();

Assert(u_sess->status != KNL_SESS_FAKE);

}

case 'X':

case EOF:

RemoveTempNamespace();

InitThreadLocalWhenSessionExit();

if (IS_THREAD_POOL_WORKER) {

t_thrd.threadpool_cxt.worker->CleanUpSession(false);

break;

}

“ThreadPoolWorker::WaitMission”函数的主要作用是阻塞所有系统信号,避免系统信号如SIGHUP等中断当前的处理。清除线程上的会话信息,保证没有上一个会话的内容,等待会话上新的请求,把会话给线程进行处理,允许系统信号中断。
“ThreadPoolWorker::CleanUpSession”函数的主要作用是清除会话,从Listener中去除会话,释放会话资源。
上面介绍了线程池的主要机制,综上所述,线程池主要是解决大并发的用户连接,在一定程度上可以起到流量控制的作用,即使用户的连接数很多,后端也不需要分配太多的线程。线程是OS的一种资源,如果线程太多,OS资源占用很多,并且大量线程的调度和切换会带来昂贵的开销。如果没有线程池,随着连接数的增多,系统的吞吐量会逐渐降低。另外一方面,把线程池划分为线程组,可以很好地匹配NUMA CPU架构的节点,提升多核情况下的访问性能。每个线程组一个监听者,避免了线程池的“惊群效应”。


3.5内存管理



数据库在运行过程中涉及许多对象,这些对象具有不同的生命周期,有些处理需要频繁分配内存。如一个SQL语句,在解析时需要对词法单元和语法单元分配内存,在执行过程中需要对执行状态分配内存。在事务结束时,如果不是PREPARE语句,那么SQL语句的执行计划内存和执行过程的状态内存都需要释放。如果是PREPARE语句,那么执行计划需要保存到缓冲池中,执行过程的状态内存释放即可。为了保证内存分配的高效和避免内存泄漏,openGauss设计开发了自己的内存管理,代码实现在“openGaussserver\src\common\backend\utils\mmgr”目录。
openGauss在内存管理上采用了上下文的概念,即具有同样生命周期或者属于同一个上下文语义的内存放到一个MemoryContext管理,MemoryContext的结构代码如下(结构成员参照注释): 


typedef struct MemoryContextData* MemoryContext;

typedef struct MemoryContextData {

NodeTag type;                  /*上下文类别*/

MemoryContextMethods* methods; /*虚函数表*/

MemoryContext parent;       /*父上下文。顶级上下文为 NULL*/

MemoryContext firstchild;      /*子上下文的链表头*/

MemoryContext prevchild;       /*前向子上下文*/

MemoryContext nextchild;       /*后向子上下文*/

char* name;                    /*上下文名称,方便调试*/

pthread_rwlock_t lock;         /*上下文共享时的并发控制锁*/

bool is_shared;                /*上下文是否在多个线程共享*/

bool isReset;            /*isReset为true时,表示复位后没有内存空间用于分配*/

int level;                     /*上下文层次级别*/

uint64 session_id;             /*上下文属于的会话ID*/

ThreadId thread_id;            /*上下文属于的线程ID*/

} MemoryContextData;

虚函数表就是具体的内存管理操作函数指针,具体定义代码如下(函数功能参照注释): 


typedef struct MemoryContextMethods {

/*在上下文中分配内存*/

void* (*alloc)(MemoryContext context,Size align,Size size,const char* file,int line);

/*释放pointer 内存到上下文中*/

void (*free_p)(MemoryContext context,void* pointer);

/*在上下文中重新分配内存*/

void* (*realloc)(MemoryContext context,void* pointer,Size align,Size size,const char* file,int line);

void (*init)(MemoryContext context);    /*上下文初始化*/

void (*reset)(MemoryContext context);   /*上下文复位*/

void (*delete_context)(MemoryContext context);   /*删除上下文*/

Size (*get_chunk_space)(MemoryContext context,void* pointer); /*获取上下文块大小*/

bool (*is_empty)(MemoryContext context);  /*上下文是否为空*/

void (*stats)(MemoryContext context,int level); /*上下文信息统计*/

#ifdef MEMORY_CONTEXT_CHECKING

void (*check)(MemoryContext context); /*上下文异常检查*/

#endif

} MemoryContextMethods;

这些回调函数指针初始化是在AllocSetContextSetMethods函数中调用AllocSetMethodDefinition函数完成的。AllocSetMethodDefinition函数的实现代码如下: 


template <bool enable_memoryprotect,bool is_shared,bool is_tracked>

void AlignMemoryAllocator::AllocSetMethodDefinition(MemoryContextMethods* method)

{

method->alloc = &AlignMemoryAllocator::AllocSetAlloc<enable_memoryprotect,is_shared,is_tracked>;

method->free_p = &AlignMemoryAllocator::AllocSetFree<enable_memoryprotect,is_shared,is_tracked>;

method->realloc = &AlignMemoryAllocator::AllocSetRealloc<enable_memoryprotect,is_shared,is_tracked>;

method->init = &AlignMemoryAllocator::AllocSetInit;

method->reset = &AlignMemoryAllocator::AllocSetReset<enable_memoryprotect,is_shared,is_tracked>;

method->delete_context = &AlignMemoryAllocator::AllocSetDelete<enable_memoryprotect,is_shared,is_tracked>;

method->get_chunk_space = &AlignMemoryAllocator::AllocSetGetChunkSpace;

method->is_empty = &AlignMemoryAllocator::AllocSetIsEmpty;

method->stats = &AlignMemoryAllocator::AllocSetStats;

#ifdef MEMORY_CONTEXT_CHECKING

method->check = &AlignMemoryAllocator::AllocSetCheck;

#endif

}

可以看到,这些实际操作内存管理的函数为AlignMemoryAllocator类中的AllocSetAlloc函数、AllocSetFree函数、AllocSetRealloc函数、AllocSetInit函数、AllocSetReset函数、AllocSetDelete函数、AllocSetGetChunkSpace函数、AllocSetIsEmpty函数、AllocSetStats函数和AllocSetCheck函数。在这些处理函数中,涉及的结构体代码如下: 


typedef AllocSetContext* AllocSet;

typedef struct AllocSetContext {

MemoryContextData header;/*内存上下文,存储空间是在这个内存上下文中分配的*/

AllocBlock blocks;  /*AllocSetContext所管理内存块的块链表头*/

AllocChunk freelist[ALLOCSET_NUM_FREELISTS]; /*空闲块链表*/

/*这个上下文的分配参数*/

Size initBlockSize;   /*初始块大小*/

Size maxBlockSize;    /*最大块大小*/

Size nextBlockSize;   /*下一个分配的块大小*/

Size allocChunkLimit; /*块大小上限*/

AllocBlock keeper;    /*在复位时保存的块*/

Size totalSpace;      /*这个上下文分配的总空间*/

Size freeSpace;       /*这个上下文总的空闲空间*/

Size maxSpaceSize;   /*最大内存空间*/

MemoryTrack track;   /*跟踪内存分配信息*/

} AllocSetContext;

/*AllocBlock定义如下:*/

typedef struct AllocBlockData* AllocBlock;

typedef struct AllocBlockData {

AllocSet aset;   /*哪个AllocSetContext 拥有此块,AllocBlockData 归属AllocSetContext管理*/

AllocBlock prev; /*在块链表中的前向指针*/

AllocBlock next; /*在块链表中的后向指针*/

char* freeptr;   /*这个块空闲空间的起始地址*/

char* endptr;    /*这个块空间的结束地址*/

Size allocSize;  /*分配的大小*/

#ifdef MEMORY_CONTEXT_CHECKING

uint64 magicNum; /*魔鬼数字值,用于内存校验。当前代码固定填写为DADA*/

#endif

} AllocBlockData;

typedef struct AllocChunkData* AllocChunk; /*AllocChunk 内存前面部分是一个AllocBlock结构*/

typedef struct AllocChunkData {

void* aset; /*拥有这个chunk的AllocSetContext,如果空闲,则为空闲列表链接*/

Size size; /*chunk中的使用空间*/

#ifdef MEMORY_CONTEXT_CHECKING

Size requested_size; /*实际请求大小,在空闲块中时为0*/

const char* file; /*palloc/palloc0调用时的文件名称*/

int line;         /*palloc/palloc0 调用时的行号*/

uint32 prenum;    /*前向魔鬼数字*/

#endif

} AllocChunkData;

从前面的数据结构可以看出,核心数据结构为AllocSetContext,这个数据结构有3个成员“MemoryContextData header;”“AllocBlock blocks;”和“AllocChunk freelist[ALLOCSET_NUM_FREELISTS];”。这3个成员把内存管理分为3个层次。
(1)  MemoryContext管理上下文之间的父子关系,设置MemoryContext的内存管理函数。
(2)  AllocBlock blocks把所有内存块通过双链表链接起来。
(3)  内存单元chunk是从内存块AllocBlock内部分配的,内存块和内存单元chunk的转换关系为: “AllocChunk chunk=(AllocChunk)(((char*)block) + ALLOC_BLOCKHDRSZ);”和“AllocBlock block=(AllocBlock)(((char*)chunk)-ALLOC_BLOCKHDRSZ);”。
内存单元chunk经过转换得到最终的用户指针,内存单元chunk和用户指针AllocPointer的转换关系为: ((AllocPointer)(((char*)(chk)) + ALLOC_CHUNKHDRSZ))和((AllocChunk)(((char*)(ptr)) - ALLOC_CHUNKHDRSZ))。数据结构的基本关系如图33所示。


图33数据结构的基本关系


下面先看第1层MemoryContext(内存上下文)的实现,主要实现在mcxt.cpp文件中,如表34所示。


表34MemoryContext的实现函数


函数
功能介绍
ChooseRootContext
在线程池机制下,上下文有3个类别,即实例级别、会话级别、线程级别。这个函数根据tag(标签)类型和parent(上一级)返回相应类别的根上下文(最顶层的上下文)
MemoryContextCreate
首先根据root是否为空确定是从父MemoryContext分配内存还是从操作系统调用malloc分配内存,然后对分配的MemoryContext进行初始化,如果存在父MemoryContext,则挂到父MemoryContext上
MemoryContextDelete
先删除这个MemoryContext的子节点,把这个MemoryContext的父节点置为空,回调AllocSetDelete方法释放分配的对象,最后释放上下文本身
MemoryContextIsEmpty
先看当前上下文是否有子节点,然后回调is_empty检查当前上下文是否为空
MemoryContextReset
先看当前上下文是否有子节点,如果有子节点,则遍历子节点,递归调用MemoryContextReset进行复位,最后回调reset复位当前上下文
MemoryContextSetParent
如果上下文有父节点,则从父上下文解除当前上下文,然后把上下文挂到新的父上下文
GetMemoryChunkSpace
当前指针pointer偏移STANDARDCHUNKHEADERSIZE找到标准StandardChunkHeader位置,然后根据块属于上下文的回调AllocSetGetChunkSpace获取块空间大小
MemoryContextStats
先回调AllocSetStats统计当前上下文信息,然后遍历子节点,递归调用MemoryContextStatsInternal统计上下文信息
MemoryContextAllocDebug
检查分配内存大小是否小于MaxAllocSize,回调AllocSetAlloc分配内存
pfree
根据当前指针pointer偏移STANDARDCHUNKHEADERSIZE找到标准头,根据头部StandardChunkHeader找到归属的上下文,回调AllocSetFree释放内存

再看第2层AllocSet的实现,主要实现在aset.cpp文件中,如表35所示。


表35AllocSet的实现函数


函数
功能介绍
AllocSetFreeIndex
根据请求的size(内存大小)计算应该在哪个空闲块链表freechunk中分配内存
set_sentinel
设置哨兵0x7E,用于内存越界写操作(修改了不应该修改的内存地址)检查
sentinel_ok
内存检查是否正常
MemoryContextControlSet
根据白名单设置上下文的maxSpaceSize限制大小
AllocSetContextCreate
根据contextType类型,调用不同的AllocSetContextCreate分配器分配MemoryContext
AllocSetMethodDefinition
设置MemoryContext回调处理方法
AllocSetContextSetMethods
设置不同上下文类型的分配器
AllocSetContextCreate
创建一个具体的MemoryContext。根据类型,确定内存保护函数。调用MemoryContextCreate函数创建一个AllocSetContext,设置maxSpaceSize大小,设置回调方法,设置初始块大小、下一个块大小、最大块大小。根据分配的最大块大小设置allocChunkLimit。如果上下文最值超过了“ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ”,则分配一个AllocBlock; 设置AllocBlock的上下文,空闲起始地址(freeptr)为跳过头部管理占用空间后剩余的空间,末尾地址(endptr)为块结束地址,分配大小(allocSize)为分配的块大小,魔鬼数字(magicNum)为0xDADADADADADADADA,context的总空间(totalSpace)加上这次分配的块大小,上下文的空闲空间(freeSpace)加上这个块的空闲空间,块的前向和后向指针为空,AllocSetContext的第一个块(blocks)指向这个块,保留块(keeper)指向这个块。返回AllocSetContext
AllocSetInit
特定AllocSetContext初始化函数,当前为空,没有使用
AllocSetReset
根据上下文类型选择保护函数,把空闲链表(freelist)置空,遍历内存块,如果是保留块,则对这块内存重新初始化,如果不是,则释放掉。根据是否有保留块重新初始化上下文
AllocSetDelete
如果上下文没有内存块,则直接返回。根据上下文类型选择保护函数,获取内存块首地址,把保留块和内存块头置空。遍历所有内存块,释放掉。把上下文的空闲空间(freeSpace)和总空间(totalSpace)置为0
AllocSetAlloc
根据上下文类型选择保护函数,如果申请内存大小超过了内存块大小上限,则直接调用OS(操作系统)接口分配一个内存块,初始化这个内存块。把内存块转换为一个内存单元(chunk),对这个内存单元进行初始化。把这个内存块挂到上下文上,返回内存单元chunk指针。

如果申请内存大小没有超过内存块大小上限,根据内存大小映射的空闲链表(freelist),看相应空闲链表中是否有相应大小的内存单元,如果有空闲的内存单元,则分配一个内存单元返回。

如果空闲链表(freelist)没有空闲的内存单元,看当前的块(blocks)是否有足够的内存,如果没有足够的内存,则根据块剩余的内存大小放到相应的空闲链表中。分配一个新的块,对这个新的块进行初始化,把块挂到上下文上,从这个块上分配一个内存单元返回。

如果当前的块有足够的内存,则从当前块上分配一个内存单元返回续表


函数
功能介绍
AllocSetFree
根据类型,确定内存保护函数。如果释放内存单元大小超过了内存块大小上限,把这个内存单元转换为AllocBlock,把内存块从上下文中解除,把内存块变量置空,释放内存块。

如果内存单元大小小于内存块上限,则根据内存单元大小映射的空闲链表,把释放的内存单元挂到上下文的空闲链表上
AllocSetRealloc
根据类型,确定内存保护函数。如果原来的内存单元大小能够满足请求的大小,则重新赋值新的内存检查信息后直接返回当前的内存单元。

如果旧的内存单元大小超过了内存块的上限,则调用OS重分配内存接口重新分配一个内存块,对新的内存块初始化,把新的内存块转换为内存单元AllocChunk返回。

如果旧的内存单元大小没有超过内存块的上限,则根据新的大小调用AllocSetAlloc分配一个新的内存单元,把旧的内存单元值复制到新的内存单元,调用AllocSetFree释放原来的内存单元
AllocSetGetChunkSpace
返回内存单元的大小,包括头部占用的空间
AllocSetIsEmpty
检查是否复位(isReset),如果是,则返回true; 否则返回false
AllocSetStats
显示上下文的内存消耗信息,打印到标准stderr(标准错误)输出上



3.6多维监控



数据库是企业的关键组件,数据库的性能直接决定了很多业务的吞吐量。为了简化数据库维护人员的调优,openGauss对数据库运行进行了多维度的监控,并且开发了一些维护特性,如WDR(Wordload Diagnostic Report,工作负荷诊断报告)性能诊断报告、慢SQL诊断、会话性能诊断、系统KPI(Key Performance Indicator,关键性能指标)辅助诊断等,帮助维护人员对数据库的性能进行诊断。这些监控项都以视图的方式对外呈现,集中在DBE_PERF模式下。WDR Snapshot除了自身快照的元数据,其他数据表来源也是DBE_PERF schema下的视图。WDR Snapshot数据表命名原则: snap_{源数据表},根据这个关系可以找到snap表所对应的原表。对这些视图的解释参照openGauss的官网(https://opengauss.org)中《开发者指南》手册的“DBE_PERF schema”章节。
性能视图的源代码在“openGaussserver\src\common\backend\catalog\performance_views.sql”文件中(网址为https://gitee.com/opengauss/openGaussserver/blob/master/src/common/backend/catalog/performance_views.sql,安装后会复制到安装路径的“performance_views.sql”下)。在数据库初始化阶段由initdb读取这个文件在数据库系统中创建相应的视图。这些视图遵循了openGauss通用视图的实现逻辑,即视图来自函数的封装,这些函数可能是内置函数,也可能是存储函数。OS运行的性能视图“dbe_perf.get_global_os_runtime”的相关代码如下: 


CREATE OR REPLACE FUNCTION dbe_perf.get_global_os_runtime

(OUT node_name name,OUT id integer,OUT name text,OUT value numeric,OUT comments text,OUT cumulative boolean)

RETURNS setof record

AS $$

DECLARE

row_data dbe_perf.os_runtime%rowtype;

query_str := 'SELECT * FROM dbe_perf.os_runtime';

FOR row_data IN EXECUTE(query_str) LOOP

…

END LOOP;

return;

END; $$

LANGUAGE 'plpgsql' NOT FENCED;

CREATE VIEW dbe_perf.global_os_runtime AS

SELECT DISTINCT * FROM dbe_perf.get_global_os_runtime();

global_os_runtime视图来自存储函数get_global_os_runtime的封装,在存储函数内访问dbe_perf.os_runtime视图、os_runtime视图的SQL语句为“CREATE VIEW dbe_perf.os_runtime AS SELECT * FROM pv_os_run_info();”。pv_os_run_info是内置函数,而内置函数负责读取数据库系统的监控指标,pv_os_run_info函数的相关代码如下: 


Datum pv_os_run_info(PG_FUNCTION_ARGS)

{

FuncCallContext* func_ctx = NULL;

/*判断是不是第一次调用*/

if (SRF_IS_FIRSTCALL()) {

MemoryContext old_context;

TupleDesc tup_desc;

/*创建函数上下文*/

func_ctx = SRF_FIRSTCALL_INIT();

/*

* 切换内存上下文到多次调用上下文

*/

old_context = MemoryContextSwitchTo(func_ctx->multi_call_memory_ctx);

/*创建一个包含5列的元组描述模板*/

tup_desc = CreateTemplateTupleDesc(5,false);

TupleDescInitEntry(tup_desc,(AttrNumber)1,"id",INT4OID,-1,0);

TupleDescInitEntry(tup_desc,(AttrNumber)2,"name",TEXTOID,-1,0);

TupleDescInitEntry(tup_desc,(AttrNumber)3,"value",NUMERICOID,-1,0);

TupleDescInitEntry(tup_desc,(AttrNumber)4,"comments",TEXTOID,-1,0);

TupleDescInitEntry(tup_desc,(AttrNumber)5,"cumulative",BOOLOID,-1,0);



/*填充元组描述模板*/

func_ctx->tuple_desc = BlessTupleDesc(tup_desc);

/*收集系统信息*/

getCpuNums();

getCpuTimes();

getVmStat();

getTotalMem();

getOSRunLoad();

(void)MemoryContextSwitchTo(old_context);

}



/*设置函数的上下文,每次函数调用都需要*/

func_ctx = SRF_PERCALL_SETUP();

while (func_ctx->call_cntr < TOTAL_OS_RUN_INFO_TYPES) {

/*填充所有元组每个字段的值*/

Datum values[5];

bool nulls[5] = {false};

HeapTuple tuple = NULL;

errno_t rc = 0;

rc = memset_s(values,sizeof(values),0,sizeof(values));

securec_check(rc,"\0","\0");

rc = memset_s(nulls,sizeof(nulls),0,sizeof(nulls));

securec_check(rc,"\0","\0");

if (!u_sess->stat_cxt.osStatDescArray[func_ctx->call_cntr].got) {

ereport(DEBUG3,

(errmsg("the %s stat has not got on this plate.",

u_sess->stat_cxt.osStatDescArray[func_ctx->call_cntr].name)));

func_ctx->call_cntr++;

continue;

}

values[0] = Int32GetDatum(func_ctx->call_cntr);

values[1] = CStringGetTextDatum(u_sess->stat_cxt.osStatDescArray[func_ctx->call_cntr].name);

values[2] = u_sess->stat_cxt.osStatDescArray[func_ctx->call_cntr].getDatum(

u_sess->stat_cxt.osStatDataArray[func_ctx->call_cntr]);

values[3] = CStringGetTextDatum(u_sess->stat_cxt.osStatDescArray[func_ctx->call_cntr].comments);

values[4] = BoolGetDatum(u_sess->stat_cxt.osStatDescArray[func_ctx->call_cntr].cumulative);



tuple = heap_form_tuple(func_ctx->tuple_desc,values,nulls);

SRF_RETURN_NEXT(func_ctx,HeapTupleGetDatum(tuple));

}

/*填充结束,返回结果*/

SRF_RETURN_DONE(func_ctx);

}

pv_os_run_info函数可以分为三段: 
(1)  调用CreateTemplateTupleDesc函数和TupleDescInitEntry函数定义元组描述信息。
(2)  调用getCpuNums函数、getCpuTimes函数、getVmStat函数、getTotalMem函数、getOSRunLoad函数收集系统信息。
(3)  把收集的u_sess信息填充到元组数据中,最后返回给调用者。openGauss提供了实现返回结果的通用SQL函数的实现步骤和方法,它们是SRF_IS_FIRSTCALL、SRF_PERCALL_SETUP、SRF_RETURN_NEXT和SRF_RETURN_DONE。从代码可以看出,pv_os_run_info的实现流程也遵循openGauss通用的SQL函数实现方法。
系统指标的收集来自读取系统信息,对数据库系统中一些模块进行打点(打点就是按照规格采集指定数据,用以记录系统运行的一些关键点)。很多打点集中在两个方面:  事务执行次数和执行时间,从而推断最大时间、最小时间、平均时间。这些比较分散,代码逻辑相对简单,这里不再进行介绍,只需要根据内置函数读取的变量查看这些变量赋值的地方就可以追踪具体的实现位置。
openGauss主要维护特性的实现代码在“openGaussserver\src\gausskernel\cbb\instruments”目录中,如WDR、SQL百分位计算,这里不再进行介绍。
性能统计对openGauss的正常运行也会带来一定的性能损耗,所以这些特性都有开关控制,具体说明如下。
(1)  等待事件信息实时收集功能的开关为enable_instr_track_wait。
(2)  Unique SQL信息实时收集功能的开关为enable_instr_unique_sql、enable_instr_rt_percentile。
(3)  数据库监控快照功能的开关为enable_wdr_snapshot。
其他功能也都有相应的GUC参数进行调节,根据平常使用的需要,可以打开具体维护项查看系统的运行情况。


3.7模拟信号机制



信号是Linux进程/线程之间的一种通信机制,向一个进程发送信号的系统函数是kill,向一个线程发送信号的系统函数是pthread_kill。在openGauss中既有gs_ctl向openGauss进程发送的进程间信号,也有openGauss进程中线程间的信号。
信号是一种有限的资源,OS提供的信号有SIGINT、SIGQUIT、SIGTERM、SIGALRM、SIGPIPE、SIGFPE、SIGUSR1、SIGUSR2、SIGCHLD、SIGTTIN、SIGTTOU、SIGXFSZ等。这些信号一般都是系统专用的,每个信号都有专门的用途,比如SIGALRM是系统定时器的通知信号。留给应用的信号主要是SIGUSR1、SIGUSR2。
在系统信号有限的情况下,为了在openGauss中表达不同的丰富的通信语义,openGauss额外增加了新的变量表示具体的语义。openGauss是多线程架构,在同一个进程内如果不同的线程注册了不同的处理函数,则后者会覆盖前者的信号处理。为了不同线程能够注册不同的处理函数,需要自己管理信号对应的注册函数。为了解决这些问题,openGauss实现了信号的模拟机制。信号模拟的基本原理是每个线程注册管理自己的信号处理函数,信号枚举值仍然使用系统的信号值,线程使用自己的变量记录信号和回调函数对应关系。线程之间发送信号时,先设置变量为具体的信号值,然后使用系统调用pthread_kill发送信号,线程收到通知后再根据额外的变量表示的具体信号值,回调对应的信号处理函数。
信号处理涉及的数据结构代码如下。每个线程有一个GsSignalSlot结构,保存了线程ID、线程名称和GsSignal结构,而GsSignal结构保存了每个信号对应的处理函数数组和每个线程相关的信号池。信号池struct SignalPool包含了使用的信号列表和空闲的信号列表,当一个模拟信号到达时,找一个空闲信号GsNode,然后放到使用的列表中。GsNode中存放了信号值结构GsSndSignal sig_data。GsSndSignal结构中保存了发送的信号具体值和发送的线程ID。当需要设置一些额外检查信息时,设置GsSignalCheck内容。相关代码如下: 


typedef struct GsSignalSlot {

ThreadId thread_id;

char* thread_name;

GsSignal* gssignal;

} GsSignalSlot;

typedef struct GsSignal {

gs_sigfunc handlerList[GS_SIGNAL_COUNT];

sigset_t masksignal;

SignalPool sig_pool;

volatile unsigned int bitmapSigProtectFun;

} GsSignal;

typedef struct SignalPool {

GsNode* free_head; /*空闲信号列表头部*/

GsNode* free_tail; /*空闲信号列表尾部*/

GsNode* used_head; /*使用信号列表头部*/

GsNode* used_tail; /*使用信号列表尾部*/

int pool_size;     /*数组列表大小*/

pthread_mutex_t sigpool_lock;

} SignalPool;

typedef struct GsNode {

GsSndSignal sig_data;

struct GsNode* next;

} GsNode;

typedef struct GsSndSignal {

unsigned int signo;  /*需要处理的信号*/

gs_thread_t thread;  /*发送信号的线程ID*/

GsSignalCheck check; /*信号发送线程需要检查的信息*/

} GsSndSignal;

typedef struct GsSignalCheck {

GsSignalCheckType check_type;

uint64 debug_query_id;

uint64 session_id;

} GsSignalCheck;

信号处理的几个主要流程为初始化模拟信号机制、注册信号处理函数、发送信号和处理信号。具体的处理逻辑如下。
(1)  初始化模拟信号机制函数gs_signal_slots_init。在gs_signal_slots_init处理函数中完成以下功能。
① 根据传入的槽位个数,分配内存。遍历每个槽位,进行初始化,初始化时调用gs_signal_init函数对每个槽位的GsSignal(GsSignal是openGauss封装的模拟信号结构体,里面包含了信号掩码和信号处理函数等成员)进行初始化。
② 在gs_signal_init函数中,对GsSignal分配内存和初始化,初始化时调用gs_signal_sigpool_init函数对信号池初始化。
③ 在gs_signal_sigpool_init函数中,对信号池进行分配内存和初始化。
(2)  注册信号处理函数gspqsignal。在gspqsignal处理函数中完成以下功能。
① 调用“gs_signal_register_handler(t_thrd.signal_slot>gssignal,signo,func);”函数把信号对应的处理函数注册到GsSignal中。在注册之前,需要为线程分配一个signal_slot,这个是在gs_signal_startup_siginfo函数中完成的。
② 在gs_signal_startup_siginfo函数中,调用gs_signal_alloc_slot_for_new_thread函数为线程分配一个signal_slot。该函数的功能是遍历“g_instance.signal_base>slots”,找到一个未使用的slot(thread_id为0表示未使用),然后设置本线程ID和线程名称。
(3)  发送信号函数gs_signal_send。在gs_signal_send处理函数中完成以下功能。
① 调用函数gs_signal_find_slot找到要发送线程所在的GsSignalSlot。
② 调用函数gs_signal_set_signal_by_threadid设置模拟信号。该函数首先检查信号在使用列表中是否已经存在,如果已经存在,则直接返回; 如果不存在,则在空闲列表中找到一个空闲GsNode,设置信号值,发送线程ID、check_type到sig_data中,最后把空闲GsNode移到使用列表中。
③ 调用函数gs_signal_thread_kill发送信号通知。该函数遍历GsSignalSlot,找到匹配的线程ID,然后调用“gs_signal_thread_kill(thread_id,RES_SIGNAL);”函数给具体线程发送信号通知。语句“#define RES_SIGNAL SIGUSR2”表示内部统一都使用SIGUSR2发送通知。
(4)  处理信号函数gs_signal_handle。在函数gs_signal_handle中完成以下功能。
① 遍历信号池使用列表,找到一个需要处理的信号。
② 找到这个信号对应的信号处理函数,把GsNode移到空闲列表中。
③ 调用gs_signal_handle_check函数检查当前的条件是否仍然满足,如果仍然有效,回调处理函数。


3.8本章小结



本章主要介绍了openGauss的一些公共组件机制,每个内容都比较独立。系统表是openGauss的元数据,本章主要介绍了系统表的定义和syscache访问机制; 数据库初始化是数据库安装后的第一步,它负责创建数据库的模板数据库和数据目录; 多线程架构是openGauss启动后的运行机制,本章介绍了主线程的初始化流程、后台线程的启动、各个线程的功能和线程之间的通信机制; 线程池技术是解决大并发链接的有效方法,本章介绍了线程池机制的原理,各个类之间的关系和设计原因; 内存管理是openGauss的内存资源管理组件,本章介绍了openGauss的三级内存管理机制; 多维监控是openGauss性能调优手段的基础,本章介绍了性能视图的基本实现原理; 模拟信号机制是openGauss多线程处理紧急事件的机制,本章介绍了这个机制的实现原理。