PG中的Latch分析

PG中的latch很奇怪,不是lock的语义,而是一种等待唤醒机制,其背后还有着更深层的含义。

引子

考虑这样一个场景:使用IO复用函数比如select时,进程收到一个信号,会怎么处理?

答案是:select返回-1,并设置errno为EINTR=4(Interrupted system call)。可见select被信号打断了,在代码中应该考虑这种状况。

为什么会这样?

  1. 信号的自定义处理函数在用户层,IO复用函数调用时在内核层
  2. 内核态转到用户态,之前的系统调用被打断

不只是select,比如sleep系统调用,如果收到信号也是提前返回,以下是一个实现不被SIGINT信号打扰的sleep例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include<stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>

void sig_handler(int num){
printf("\nrecvive the signal is %d\n", num);
}

int main(){
int time = 20;

signal(SIGINT, sig_handler);
printf("enter to the sleep.\n");

do{
time = sleep(time);
}while(time > 0);

printf("sleep is over, main over.\n");

exit(0);
}

那么,有什么方法能够将信号处理与IO事件处理互相统一(比如在select中监听信号事件)呢?有,那就是self-pipe技巧。

self-pipe trick

自管道技巧,与网络编程中的一个概念统一事件源很相似,操作手法为:

  1. 定义一个匿名管道(进出全部非阻塞)只给当前进程自己使用,所以叫self-pipe。
  2. 用select监听业务IO事件 并 监听self-pipe[0]读端。
  3. 当信号到来时,信号处理函数在self-pipe[1]中写入1个字节就退出。
  4. select被self-pipe[0]唤醒,检查FD_ISSET(self-pipe[0]…)是否就绪,走信号处理流程。

大概就是这样的流程:

有什么好处?

私以为:信号处理和IO复用处理明显同步了,先IO复用再信号处理,可以避免潜在的竞争。管道上传来的字节能够可靠地中断IO调用,即使信号在IO复用函数调用开始之前到达。

reference:https://cloud.tencent.com/developer/article/2001066?from_column=20421&from=20421

Latch

好了,讲了那么多,self-pipe和Latch有什么关系?

答案是:Latch其实就是self-pipe技巧的封装。

数据结构

大概有以下几个Latch相关的函数

1
2
3
4
5
6
7
InitLatch(),初始化一个非共享型 Latch
InitSharedLatch(),初始化一个共享型 Latch
WaitLatch(),等待 Latch
ResetLatch(),重置 Latch
SetLatch(),设置 Latch 标记,唤醒等待 Latch 的进程
OwnLatch(),设置 Latch 的拥有者为当前进程
DisownLatch(),当前进程不再拥有该 Latch

Latch的结构体:

1
2
3
4
5
typedef struct {
sig_atomic_t is_set; //是否设置
bool is_shared; //是否共享
ThreadId owner_pid; // 拥有者的线程ID
} Latch;

InitializeLatchSupport

InitializeLatchSupport函数初始化process-local latch设施。该函数必须在任何进程启动过程中InitLactch或OwnLatch函数调用之前调用。

主要完成:创建管道,设置管道读端和写端全局变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void InitializeLatchSupport(void){
int pipefd[2];

Assert(selfpipe_readfd == -1);

/*
* Set up the self-pipe that allows a signal handler to wake up the
* select() in WaitLatch. Make the write-end non-blocking, so that
* SetLatch won't block if the event has already been set many times
* filling the kernel buffer. Make the read-end non-blocking too, so that
* we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
*/
if (pipe(pipefd) < 0)
ereport(FATAL, (errmsg("pipe() failed: %m")));
if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
ereport(FATAL, (errmsg("fcntl() failed on read-end of self-pipe: %m")));
if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
ereport(FATAL, (errmsg("fcntl() failed on write-end of self-pipe: %m")));

selfpipe_readfd = pipefd[0];
selfpipe_writefd = pipefd[1];
}

InitLatch

InitLatch函数初始化进程本地的latch(自己初始化自己的latch,设置latch的为当前进程的Pid)。

1
2
3
4
5
6
7
8
void InitLatch(volatile Latch* latch){
/* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0);

latch->is_set = false;
latch->owner_pid = t_thrd.proc_cxt.MyProcPid; // 设置latch的为当前进程的Pid
latch->is_shared = false;
}

在Latch上等待

WaitLatch就是调用WaitLatchOrSocket函数,WaitLatchOrSocket比WaitLatch多了一个PGINVALID_SOCKET。

1
2
3
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info) {
return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout, wait_event_info);
}

几个关键的数据结构:

  • WaitEventSet 等待事件的集合
  • WaitEvent 等待事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* typedef in latch.h */
struct WaitEventSet{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */

/*
* Array, of nevents_space length, storing the definition of events this
* set is waiting for.
*/
WaitEvent *events;

/*
* If WL_LATCH_SET is specified in any wait event, latch is a pointer to
* said latch, and latch_pos the offset in the ->events array. This is
* useful because we check the state of the latch before performing doing
* syscalls related to waiting.
*/
Latch *latch; // 数组记录该set下所有的latch = event
int latch_pos;

int epoll_fd;
/* epoll_wait returns events in a user provided arrays, allocate once */
struct epoll_event *epoll_ret_events;
};

// 每个epoll事件对应一个,也对应一个latch
typedef struct WaitEvent{
int pos; /* position in the event data structure */
uint32 events; /* triggered events */
pgsocket fd; /* socket fd associated with event */
void *user_data; /* pointer provided in AddWaitEventToSet */
} WaitEvent;

怎么实现在latch上等待呢?实际上还是利用IO复用等待socket的事件。

WaitLatch调用时使用PGINVALID_SOCKET,随后使用Poll或者Select监听PGINVALID_SOCKET和管道读端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
int
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
long timeout, uint32 wait_event_info)
{
int ret = 0;
int rc;
WaitEvent event;
WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
/******************************************************************
CreateWaitEventSet展开:构造WaitEventSet

WaitEventSet *set
// 内存结构:
set->
sz += MAXALIGN(sizeof(WaitEventSet)) // 整体分配一个WaitEventSet
set->events->
sz += MAXALIGN(sizeof(WaitEvent) * nevents) // 每个事件有一个WaitEvent
set->epoll_ret_events->
sz += MAXALIGN(sizeof(struct epoll_event) * nevents) // 要监听的3个事件
set->latch = NULL
set->nevents_space = nevents

set->epoll_fd = epoll_create1(EPOLL_CLOEXEC) // 200w个
******************************************************************/


if (wakeEvents & WL_TIMEOUT)
Assert(timeout >= 0);
else
timeout = -1;

if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
(Latch *) latch, NULL);
/******************************************************************
WL_LATCH_SET会进入这个分支:
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)

1、现在的latch={is_set = 0, is_shared = 1 '\001', owner_pid = 30877}, 30877是startup的pid
2、开始拼WaitEvent *event;
event = &set->events[set->nevents]
...
event->fd = selfpipe_readfd ***********注意这里监控的是管道的读端
...
//set: {nevents = 1, nevents_space = 3, events = 0xf81dd8, latch = 0x2aaaaac0d254, latch_pos = 0, epoll_fd = 7, epoll_ret_events = 0xf81e20}
//event: {pos = 0, events = 1, fd = 13, user_data = 0x0}
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD)
epoll_event epoll_ev:
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev)

******************************************************************/
if (wakeEvents & WL_POSTMASTER_DEATH && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
NULL, NULL);
/******************************************************************
WL_POSTMASTER_DEATH进入这个分支

和上流程相同,不同的是event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]
******************************************************************/

if (wakeEvents & WL_SOCKET_MASK)
{
int ev;

ev = wakeEvents & WL_SOCKET_MASK;
AddWaitEventToSet(set, ev, sock, NULL, NULL);
}

rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
/******************************************************************
开始等待:
类似epoll的函数构造,传入上面构造好的set,可能记录多个event。 传出event唤醒的事件。

进入
rc = WaitEventSetWaitBlock(set, cur_timeout,occurred_events, nevents);
epoll_wait(set->epoll_fd, set->epoll_ret_events, nevents, cur_timeout)
等5秒唤醒 rc == 0 return -1;


******************************************************************/
if (rc == 0)
ret |= WL_TIMEOUT;
else
{
ret |= event.events & (WL_LATCH_SET |
WL_POSTMASTER_DEATH |
WL_SOCKET_MASK);
}

FreeWaitEventSet(set);
/******************************************************************
释放刚刚epoll_create1创建的epoll_fd

close(set->epoll_fd)

释放整体

pfree(set)
******************************************************************/
return ret;
}

主要就是三个函数:

  • AddWaitEventToSet
  • WaitEventSetWait
  • FreeWaitEventSet

唤醒Latch

唤醒Latch调用SetLatch,其主要作用为设置一个latch,然后唤醒等待的进程。

  • 如果是当前进程在等待该latch,说明我们是在信号处理函数中设置的Latch,我们使用self-pipe唤醒poll或epoll_wait。
  • 如果是其他进程在等待该latch,则发送一个SIGUSR1信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
* Sets a latch and wakes up anyone waiting on it.
*
* This is cheap if the latch is already set, otherwise not so much.
*
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it. (That's standard practice in most signal handlers, of
* course, but we used to omit it in handlers that only set a flag.)
*/
void SetLatch(volatile Latch* latch)
{
ThreadId owner_pid;

/*
* The memory barrier has be to be placed here to ensure that any flag
* variables possibly changed by this process have been flushed to main
* memory, before we check/set is_set.
*/
pg_memory_barrier();

/* Quick exit if already set */
if (latch->is_set)
return;

latch->is_set = true;

/*
* See if anyone's waiting for the latch. It can be the current process if
* we're in a signal handler. We use the self-pipe to wake up the select()
* in that case. If it's another process, send a signal.
*
* Fetch owner_pid only once, in case the latch is concurrently getting
* owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
* guaranteed to be true! In practice, the effective range of pid_t fits
* in a 32 bit integer, and so should be atomic. In the worst case, we
* might end up signaling the wrong process. Even then, you're very
* unlucky if a process with that bogus pid exists and belongs to
* openGauss; and PG database processes should handle excess SIGUSR1
* interrupts without a problem anyhow.
*
* Another sort of race condition that's possible here is for a new
* process to own the latch immediately after we look, so we don't signal
* it. This is okay so long as all callers of ResetLatch/WaitLatch follow
* the standard coding convention of waiting at the bottom of their loops,
* not the top, so that they'll correctly process latch-setting events
* that happen before they enter the loop.
*/
owner_pid = latch->owner_pid;
if (owner_pid == 0)
return;
else if (owner_pid == t_thrd.proc_cxt.MyProcPid) {
if (waiting)
sendSelfPipeByte();
} else
gs_signal_send(owner_pid, SIGUSR1);
}

sendSelfPipeByte就是往管道写一个数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void sendSelfPipeByte(void){
int rc;
char dummy = 0;
retry:
rc = write(selfpipe_writefd, &dummy, 1);
if (rc < 0) {
/* If interrupted by signal, just retry */
if (errno == EINTR)
goto retry;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
return;
}
}

而SIGUSR1信号的处理函数行为是:如果自己在等待,那么往管道写一个数据。

1
2
3
void latch_sigusr1_handler(void) {
if (waiting) sendSelfPipeByte();
}

综合上来看,唤醒Latch的行为是往管道写一个字节的数据,任何监听管道读端的进程都会被唤醒。

总结

常用的等待唤醒机制就是条件变量机制,但是条件变量要搭配互斥锁一起使用。在没有特别地需要锁保护临界区的场景,利用条件变量实现等待唤醒闲的有些过于刻意。

利用Latch机制可以实现无锁睡眠等待,避免锁的开销,也避免许多引入锁带来的问题。


reference:

作者

Desirer

发布于

2024-08-04

更新于

2024-11-15

许可协议