openGauss的模拟信号机制

从多线程下的信号机制讲起,在Linux下所有线程共享进程的signal action,如何做到每个线程对相同信号有不同处理方式呢?一起看看OpenGauss是如何设计的。

多线程下的信号机制

在linux下,每个进程都有自己的signal mask,这个信号掩码指定哪个信号被阻塞,哪个不会被阻塞,通常用调用sigmask来处理。同时一个进程还有自己的signal action,这个行为集合指定了信号在进程内的所有线程该如何处理,通常调用sigaction来处理。

使用了多线程后有以下几个关键问题:

  1. 信号发生时,哪个线程会收到?
  2. 每个线程都有自己的mask及action吗?
  3. 每个线程能按自己的方式处理信号吗?
  • 如果是异常产生的信号(比如程序错误,像SIGPIPE、SIGEGV这些),则只有产生异常的线程收到并处理。

  • 如果是用pthread_kill产生的内部信号,则只有pthread_kill参数中指定的目标线程收到并处理。

  • 如果是外部使用kill命令产生的信号,通常是SIGINT、SIGHUP等job control信号,则会遍历所有线程,直到找到一个不阻塞该信号的线程,然后调用它来处理。(一般从主线程找起),注意只有一个线程能收到。

第二个问题:每个线程都有自己独立的signal mask,但所有线程共享进程的signal action。这意味着进程内的所有线程共享进程的信号处理函数,当进程内的一个线程为某个信号注册了处理函数,另一个线程可以更改这个处理函数。

第三个问题:由第二点可知,所有线程处理信号的方式都是一样的。

模拟信号的基本思想

openGauss是多线程架构,在同一个进程内的不同的线程注册不同的处理函数,则后者会覆盖前者的信号处理。如何做到对于相同信号,不同线程有不同处理方式呢?

答案是:绕开操作系统,由oG自己实现信号机制。让线程定义“信号”、定义“信号”的处理函数。

信号模拟的基本原理是每个线程注册管理自己的信号处理函数,信号枚举值仍然使用系统的信号值,线程使用自己的变量记录信号和回调函数对应关系。

结构体

具体到代码细节中,我们会有一个保存所有线程对于信号处理和信号的结构体GsSignalSlot的数组:g_instance.signal_base->slots

GsSignalSlot中重要的是 GsSignal ,它描述了一个线程怎么处理信号 gs_sigfunc 以及线程要处理的信号 SignalPool

到这里,我们大致可以猜出信号发送和处理的逻辑流程了。

线程A给线程B发送信号:

  1. 遍历g_instance.signal_base->slots,找到B的slot
  2. 构造信号GsNode,然后添加到B的信号池中

线程B接收信号:

  1. 遍历自己信号池,取出未处理的信号进行处理

但是线程B怎么知道有信号到来?如何在合适的时机进行信号池的遍历?后续将一一揭晓。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 定义一个结构体,用于表示信号槽,即信号与处理线程之间的关联  
typedef struct GsSignalSlot {
ThreadId thread_id; // 线程ID,表示哪个线程将处理这个信号
char* thread_name; // 线程名称,用于标识或调试
GsSignal* gssignal; // 指向GsSignal结构体的指针,包含信号处理的具体信息
} GsSignalSlot;

// 定义一个结构体,用于管理信号相关的信息和处理函数
typedef struct GsSignal {
gs_sigfunc handlerList[GS_SIGNAL_COUNT]; // 信号处理函数数组,GS_SIGNAL_COUNT定义了可处理的信号数量
sigset_t masksignal; // 信号掩码,用于阻塞某些信号
SignalPool sig_pool; // 信号池,用于管理信号资源
volatile unsigned int bitmapSigProtectFun;// 位图,用于保护或标记某些信号处理函数的状态
} GsSignal;

// 定义一个结构体,用于管理信号池中的信号节点
typedef struct SignalPool {
GsNode* free_head; // 指向空闲信号节点链表的头部
GsNode* free_tail; // 指向空闲信号节点链表的尾部
GsNode* used_head; // 指向已使用信号节点链表的头部
GsNode* used_tail; // 指向已使用信号节点链表的尾部
int pool_size; // 信号池的大小,即能容纳的信号节点数量
pthread_mutex_t sigpool_lock; // 信号池访问的互斥锁
} SignalPool;

然后就是具体信号相关的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 信号链表
typedef struct GsNode {
GsSndSignal sig_data; // 信号数据,包含信号编号、发送信号的线程ID等信息
struct GsNode* next; // 指向下一个节点的指针
} GsNode;

// 定义一个结构体,表示需要发送的信号及其相关信息
typedef struct GsSndSignal {
unsigned int signo; // 需要处理的信号编号
gs_thread_t thread; // 发送信号的线程ID
GsSignalCheck check; // 发送信号时需要检查的信息,如检查类型、调试查询ID、会话ID等
} GsSndSignal;

// 定义一个结构体,表示信号发送时需要检查的信息
typedef struct GsSignalCheck {
GsSignalCheckType check_type; // 检查类型,可能是一个枚举,定义了不同类型的检查
uint64 debug_query_id; // 调试查询ID,用于调试或跟踪
uint64 session_id; // 会话ID,标识特定的会话或操作
} GsSignalCheck;

基本思想

在进程环境中,对信号的处理是异步的:先注册信号处理函数,当信号异步发生时,由信号处理函数来处理信号。

但在多线程中处理信号的原则是:将信号的异步处理转换成同步处理。也就是说用一个线程专门的来“同步等待”信号的到来,而其它的线程可以完全不被该信号中断/打断(interrupt)。这样就在相当程度上简化了在多线程环境中对信号的处理,且可以保证其它的线程不受信号的影响。

模拟信号初始化

1
2
3
4
5
6
7
8
if (isBoot) {
IsInitdb = true;
gs_signal_monitor_startup(); // 信号监听线程启动
gs_signal_slots_init(1); // 初始化信号槽
(void)gs_signal_unblock_sigusr2(); // 停止阻塞SIGUSR2信号
gs_signal_startup_siginfo("AuxiliaryProcessMain"); // 为当前线程分配信号槽位
BootStrapProcessMain(argc, argv);
}

在main.cpp的启动流程中,关于信号处理有以下几个步骤:

  1. 启动信号监听器线程
  2. 初始化信号槽
  3. 停止阻塞SIGUSR2信号
  4. 为当前线程分配信号槽位

信号监听线程和信号槽前面已经介绍过,这里SIGUSR2是操作系统留给用户的自定义信号。

在oG中,这个信号被用于线程间通讯。也就是说线程互相发“信号”中的“信号”,都是SIGUSR2。线程收到SIGUSR2后的行为,统一都是遍历自己的信号池,对待处理的信号进行处理。

信号监听器线程启动

1
2
3
4
5
6
void gs_signal_monitor_startup(void){
(void)gs_signal_block_sigusr2(); // 阻塞SIGUSR2信号
(void)gs_signal_install_handler(); // 加载SIGUSR2信号的handler
errCode = gs_thread_create(&thread, gs_signal_receiver_thread, 0, NULL); // 启动信号监听线程
return;
}

前述在main.cpp中启动了监听线程gs_signal_monitor_startup(),实际上做的事有:

  1. 阻塞SIGUSR2信号
  2. 加载SIGUSR2信号的处理函数
  3. 启动信号监听线程

阻塞SIGUSR2信号是为了让监听线程启动不受该信号影响。

gs_signal_install_handler()在进程级别设置SIGUSR2信号的处理函数为gs_res_signal_handlerRES_SIGNAL 被定义为 SIGUSR2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
STATIC gs_sigaction_func gs_signal_install_handler(void){
struct sigaction act, oact;
/*
* It is important to set SA_NODEFER flag, so this signal is not added
* to the signal mask of the calling process on entry to the signal handler.
*/
sigemptyset(&act.sa_mask);
// 设置RES_SIGNAL的处理函数为gs_res_signal_handler
act.sa_sigaction = gs_res_signal_handler;
act.sa_flags = 0;
act.sa_flags |= SA_SIGINFO;
act.sa_flags |= SA_RESTART;
// 调用sigaction进行信号的注册,`RES_SIGNA`L 被定义为 `SIGUSR2`
if (sigaction(RES_SIGNAL, &act, &oact) < 0) {
ereport(PANIC, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("not able to set up signal action handler")));
}

/* Return previously installed handler */
return oact.sa_sigaction;
}

然后启动了信号监听线程:我们看到这个监听线程的作用就是等待信号到来,然后由它将信号发送给具体的线程,将异步的过程改为同步的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void* gs_signal_receiver_thread(void* args){
/*
* Listen on all signals. We explicitely include the list of signals that we
* are interested in to reduce exposure.
*/
sigset_t waitMask;

/* wait below signals: SIGINT, SIGTERM, SIGQUIT, SIGHUP */
sigemptyset(&waitMask);
sigaddset(&waitMask, SIGINT);
sigaddset(&waitMask, SIGTERM);
sigaddset(&waitMask, SIGQUIT);
sigaddset(&waitMask, SIGHUP);
sigaddset(&waitMask, SIGUSR1);
sigaddset(&waitMask, SIGURG);

gs_signal_block_sigusr2();

/* add just for memcheck */
gs_thread_args_free();

for (;;) {
int signo;

/* Wait for signals arrival. */
sigwait(&waitMask, &signo);

/* send signal to thread */
(void)gs_signal_send(PostmasterPid, signo);
}
return NULL;
}

volatile ThreadId PostmasterPid = 0;

这里 gs_signal_send 下面会介绍,目前只要知道它是发送信号的函数。

1
2
3
4
5
6
7
8
9
10
/*
* standalone backend). IsUnderPostmaster is true in postmaster child
* processes. Note that "child process" includes all children, not only
* regular backends. These should be set correctly as early as possible
* in the execution of a process, so that error handling will do the right
* things if an error should occur during process initialization.
*
* These are initialized for the bootstrap/standalone case.
*/
THR_LOCAL bool IsUnderPostmaster = false;

而这个PostmasterPid在 !IsUnderPostmaster 下,为自身线程的ID。也就是说,Postmaster的子线程的PostmasterPid是固定的,它代表postmaster的线程ID。

这样我们知道,当有外部信号发生时,它被信号监听线程捕获,然后发送给postmaster主线程处理!

信号槽初始化

xx

模拟信号使用

信号注册

在每一个线程中,都需要单独为信号注册处理函数,比如线程池的监听器组件:

1
2
3
4
5
6
7
8
9
10
11
void TpoolListenerMain(ThreadPoolListener* listener)
{
t_thrd.proc_cxt.MyProgName = "ThreadPoolListener";
pgstat_report_appname("ThreadPoolListener");
(void)gspqsignal(SIGURG, print_stack);
(void)gspqsignal(SIGHUP, SIG_IGN);
(void)gspqsignal(SIGINT, SIG_IGN);
// die with pm
(void)gspqsignal(SIGTERM, SIG_IGN);
(void)gspqsignal(SIGQUIT, SIG_IGN);
(void)gspqsignal(SIGPIPE, SIG_IGN);

注册信号处理函数gspqsignal调用gs_signal_register_handler为信号设置处理函数,其实就是往handlerList里填处理函数的地址。

1
2
3
4
5
static gs_sigfunc gs_signal_register_handler(GsSignal* gs_signal, int signo, gs_sigfunc fun){
prefun = gs_signal->handlerList[signo];
gs_signal->handlerList[signo] = fun;
return prefun;
}

信号发送

gs_signal_send 发送信号主要包括两步:

  • 包装模拟信号 code = gs_signal_set_signal_by_threadid(thread_id, signo);
  • 向线程发送信号 code = gs_signal_thread_kill(thread_id, RES_SIGNAL);

gs_signal_set_signal_by_threadid 包装模拟信号的逻辑比较简单,主要为:

  1. 根据thread id 找到对应的信号槽slot
  2. 在slot的信号池中找到空闲的gsnode
  3. 将信号注册到gsnode中

gs_signal_thread_kill 的逻辑也很简单,其实就是对pthread_kill的封装。但是有一点,它遍历了slot,这可能是为了验证线程存在:通过遍历信号槽数组,gs_signal_thread_kill可以验证指定的线程ID是否确实存在于当前进程的上下文中。如果线程ID不存在于数组中,那么可能意味着该线程已经退出或从未被创建,因此发送信号没有意义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int gs_signal_thread_kill(ThreadId tid, int signo){
unsigned int loop = 0;
GsSignalSlot* slots_index = g_instance.signal_base->slots;
int ret = 0;

Assert(NULL != g_instance.signal_base->slots);
Assert(g_instance.signal_base->slots_size > 0);

(void)pthread_mutex_lock(&(g_instance.signal_base->slots_lock));
gs_signal_location_base_signal_lock_info(__func__, 0);

for (loop = 0; loop < g_instance.signal_base->slots_size; loop++) {
if (slots_index->thread_id == tid) {
// 发送信号
ret = gs_thread_kill(tid, signo); // pthread_kill的封装

gs_signal_unlocation_base_signal_lock_info();
(void)pthread_mutex_unlock(&(g_instance.signal_base->slots_lock));

return ret;
}
slots_index++;
}

gs_signal_unlocation_base_signal_lock_info();
(void)pthread_mutex_unlock(&(g_instance.signal_base->slots_lock));

return ESRCH;
}

注意:gs_signal_send 发送的信号始终是 RES_SIGNAL 即 SIGUSR2 ,那么对应线程的处理则由gs_signal_handle函数进行!

信号接收

前述oG的模拟信号初始化的过程中阻塞了SIGUSR2信号,等到信号监听线程启动后,又停止阻塞SIGUSR2信号。由于新建的线程都继承了父线程的sigmask,它们都不会阻塞SIGUSR2信号。并且,所有线程共享sigaction,对于SIGUSR2信号处理行为是一致,就是gs_signal_handle函数。

它的行为是:

①遍历信号池使用列表,找到一个需要处理的信号。

②找到这个信号对应的信号处理函数。把GsNode移到空闲列表中。

③调用gs_signal_handle_check函数检查当前的条件是否仍然满足。如果仍然有效,回调处理函数。

总结

监听线程同步等待信号,但是对于其他线程来说,处理信号的过程还是异步的(接收SIGUSR2信号,然后在自己的信号池里取出模拟信号处理)。

作者

Desirer

发布于

2024-08-25

更新于

2024-11-15

许可协议