openGauss的中断与信号

OG的“中断”并不是真正的OS级别的中断,它只是用来处理取消查询Cancel Querying的相关逻辑。

引子

从代码可以看到,在整个语句的执行过程中,经常出现CHECK_FOR_INTERRUPTS();以及保持中断和开中断的操作。下面来探讨一下这个的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define CHECK_FOR_INTERRUPTS() \
do { \
if (InterruptPending) \
ProcessInterrupts(); \
} while(0)

#define HOLD_INTERRUPTS() (t_thrd.int_cxt.InterruptHoldoffCount++)

#define RESUME_INTERRUPTS() \
do { \
if (t_thrd.int_cxt.InterruptCountResetFlag && t_thrd.int_cxt.InterruptHoldoffCount== 0){ \
t_thrd.int_cxt.InterruptCountResetFlag = false; \
} else { \
Assert(t_thrd.int_cxt.InterruptHoldoffCount > 0); \
t_thrd.int_cxt.InterruptHoldoffCount--; \
} \
} while (0)

CHECK_FOR_INTERRUPTS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/*
* ProcessInterrupts: out-of-line portion of CHECK_FOR_INTERRUPTS() macro
*
* If an interrupt condition is pending, and it's safe to service it,
* then clear the flag and accept the interrupt. Called only when
* InterruptPending is true.
*/
void ProcessInterrupts(void)
{
/* OK to accept interrupt now? */
if (t_thrd.int_cxt.InterruptHoldoffCount != 0 || t_thrd.int_cxt.CritSectionCount != 0)
return;

if (t_thrd.bn && ((unsigned int)(t_thrd.bn->flag) & THRD_SIGTERM)) {
t_thrd.int_cxt.ProcDiePending = true;
t_thrd.bn->flag = ((unsigned int)(t_thrd.bn->flag)) & ~THRD_SIGTERM;
}

// The 'u_sess->stream_cxt.in_waiting_quit' flag is set to true to enable signal handling when waiting sub stream
// threads quit. At the same time, if we get a SIGTERM signal, this signal should be held and the 'InterruptPending'
// flag should not be set to false immediately. After all sub thread quit and the top consumer goes back to
// ReadCommand again, the pending interrupt can be safely handled in function prepare_for_client_read.
//
if (t_thrd.int_cxt.ProcDiePending && u_sess->stream_cxt.in_waiting_quit) {
// It's more efficient to notify all stream threads to cancel the query first
// and then top consumer can quit quickly.
//
StreamNodeGroup::cancelStreamThread();
return;
}

if (StreamThreadAmI() && u_sess->debug_query_id == 0) {
Assert(0);
}

InterruptPending = false;
if (t_thrd.wlm_cxt.wlmalarm_pending) {...}
if (t_thrd.int_cxt.ProcDiePending && !u_sess->stream_cxt.in_waiting_quit) {...}
if (t_thrd.int_cxt.ClientConnectionLost && !u_sess->stream_cxt.in_waiting_quit) {...}
if (t_thrd.int_cxt.QueryCancelPending) {...}

这里列出部分代码,从注释中可以看到,Called only when InterruptPending is true.。那么我们要去看这个InterruptPending是怎么一回事。

InterruptPending

1
THR_LOCAL volatile bool InterruptPending = false;

InterruptPending 是一个布尔类型的线程局部变量,它是在什么时候变为True的呢?我们在postgres服务进程中全局搜索InterruptPending = true。可以看到,这些都是信号处理函数。

1
2
3
void die(SIGNAL_ARGS)
void StatementCancelHandler(SIGNAL_ARGS)
void PoolValidateCancelHandler(SIGNAL_ARGS)

粗略阅读这些函数的代码,仅仅只是将InterruptPending设置为true,并没有ProcessInterrupts之类的主动调用,那么ProcessInterrupts调用时机究竟是怎么一回事呢?

ProcessInterrupts

ProcessInterrupts 函数对一系列中断上下文的中断标志位进行处理。看来并没有特别处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/*
* ProcessInterrupts: out-of-line portion of CHECK_FOR_INTERRUPTS() macro
*
* If an interrupt condition is pending, and it's safe to service it,
* then clear the flag and accept the interrupt. Called only when
* InterruptPending is true.
*/
void ProcessInterrupts(void)
{
/* OK to accept interrupt now? */
if (t_thrd.int_cxt.InterruptHoldoffCount != 0 || t_thrd.int_cxt.CritSectionCount != 0)
return;

if (t_thrd.bn && ((unsigned int)(t_thrd.bn->flag) & THRD_SIGTERM)) {
t_thrd.int_cxt.ProcDiePending = true;
t_thrd.bn->flag = ((unsigned int)(t_thrd.bn->flag)) & ~THRD_SIGTERM;
}

// The 'u_sess->stream_cxt.in_waiting_quit' flag is set to true to enable signal handling when waiting sub stream
// threads quit. At the same time, if we get a SIGTERM signal, this signal should be held and the 'InterruptPending'
// flag should not be set to false immediately. After all sub thread quit and the top consumer goes back to
// ReadCommand again, the pending interrupt can be safely handled in function prepare_for_client_read.
//
if (t_thrd.int_cxt.ProcDiePending && u_sess->stream_cxt.in_waiting_quit) {
// It's more efficient to notify all stream threads to cancel the query first
// and then top consumer can quit quickly.
//
StreamNodeGroup::cancelStreamThread();
return;
}

if (StreamThreadAmI() && u_sess->debug_query_id == 0) {
Assert(0);
}

InterruptPending = false;

if (t_thrd.wlm_cxt.wlmalarm_pending) {
t_thrd.wlm_cxt.wlmalarm_pending = false;
(void)WLMProcessWorkloadManager();
}

if (t_thrd.int_cxt.ProcDiePending && !u_sess->stream_cxt.in_waiting_quit) {
...
}
if (t_thrd.int_cxt.ClientConnectionLost && !u_sess->stream_cxt.in_waiting_quit) {
...
}
if (t_thrd.int_cxt.QueryCancelPending) {
...
}
/* If we get here, do nothing (probably, t_thrd.int_cxt.QueryCancelPending was reset) */
}

HOLD_INTERRUPTS&&RESUME_INTERRUPTS

1
#define HOLD_INTERRUPTS() (t_thrd.int_cxt.InterruptHoldoffCount++)

HOLD_INTERRUPTS() 有什么用呢?只不过增加了一个计数。仔细看processIntrerrupts处理中断的代码:只有当中断计数为零的时候,才能处理中断。这就是关中断的意思:我调用HOLD_INTERRUPTS之后,保证不会进行ProcessInterrupts,处理完关键代码后,我再开中断。

1
2
3
4
5
void ProcessInterrupts(void)
{
/* OK to accept interrupt now? */
if (t_thrd.int_cxt.InterruptHoldoffCount != 0 || t_thrd.int_cxt.CritSectionCount != 0)
return;

结论

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*****************************************************************************
* System interrupt and critical section handling
*
* There are two types of interrupts that a running backend needs to accept
* without messing up its state: QueryCancel (SIGINT) and ProcDie (SIGTERM).
* In both cases, we need to be able to clean up the current transaction
* gracefully, so we can't respond to the interrupt instantaneously ---
* there's no guarantee that internal data structures would be self-consistent
* if the code is interrupted at an arbitrary instant. Instead, the signal
* handlers set flags that are checked periodically during execution.
*
* The CHECK_FOR_INTERRUPTS() macro is called at strategically located spots
* where it is normally safe to accept a cancel or die interrupt. In some
* cases, we invoke CHECK_FOR_INTERRUPTS() inside low-level subroutines that
* might sometimes be called in contexts that do *not* want to allow a cancel
* or die interrupt. The HOLD_INTERRUPTS() and RESUME_INTERRUPTS() macros
* allow code to ensure that no cancel or die interrupt will be accepted,
* even if CHECK_FOR_INTERRUPTS() gets called in a subroutine. The interrupt
* will be held off until CHECK_FOR_INTERRUPTS() is done outside any
* HOLD_INTERRUPTS() ... RESUME_INTERRUPTS() section.
*
* Special mechanisms are used to let an interrupt be accepted when we are
* waiting for a lock or when we are waiting for command input (but, of
* course, only if the interrupt holdoff counter is zero). See the
* related code for details.
*
* A lost connection is handled similarly, although the loss of connection
* does not raise a signal, but is detected when we fail to write to the
* socket. If there was a signal for a broken connection, we could make use of
* it by setting t_thrd.int_cxt.ClientConnectionLost in the signal handler.
*
* A related, but conceptually distinct, mechanism is the "critical section"
* mechanism. A critical section not only holds off cancel/die interrupts,
* but causes any ereport(ERROR) or ereport(FATAL) to become ereport(PANIC)
* --- that is, a system-wide reset is forced. Needless to say, only really
* *critical* code should be marked as a critical section! Currently, this
* mechanism is only used for XLOG-related code.
*
*****************************************************************************/

仔细查看knl_t_interrupt_context的定义才发现有一大段注释,阅读完之后这个中断机制了解。

所谓“中断”其实是命令执行的中断,更具体的是事务执行过程中的中断(包括主动中断SIGINT和被动中断SIGTERM)。OG无法保证在事务执行的任何过程中都能执行中断逻辑,因此只能在一些相对“安全”的地方,主动调用CHECK_FOR_INTERRUPTS来检查中断是否发生。关中断和开中断能够保证代码执行过程中不受cancel或die的中断影响。

与之相似的还有一个“关键区”,任何发生在关键区的异常等级都会升级到PANIC。

作者

Desirer

发布于

2024-09-16

更新于

2024-11-15

许可协议