PG存储astore解析一

PG存储结构astore表操作简单解析插入、删除、更新,并结合代码深度解析单表查询流程。

概述

PG的表被称为堆,其中的元组以互不关联的形式一条一条堆在一起。每个页面的存储结构如下:

  • 页头PageHeaderData
  • 元组指针数组Linp
  • 空闲空间 freespace
  • 元组数组tuple
  • 特别空间special space
1
2
3
4
5
6
7
8
9
10
11
12
13
14
* +----------------+---------------------------------+
* | PageHeaderData | linp1 linp2 linp3 ... |
* +-----------+----+---------------------------------+
* | ... linpN | |
* +-----------+--------------------------------------+
* | ^ pd_lower |
* | |
* | v pd_upper |
* +-------------+------------------------------------+
* | | tupleN ... |
* +-------------+------------------+-----------------+
* | ... tuple3 tuple2 tuple1 | "special space" |
* +--------------------------------+-----------------+
* ^ pd_special

页头记录了pd_lower、pd_upper,分别表示空闲空间的起始偏移和结束偏移,pd_upper - pd_lower即可获得空闲空间的大小。pd_linp为ItemIdData数组,通过pd_linp可以定位到一条记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typedef struct {
/* XXX LSN is member of *any* block, not only page-organized ones */
PageXLogRecPtr pd_lsn; /* LSN: next byte after last byte of xlog
* record for last change to this page */
uint16 pd_checksum; /* checksum */
uint16 pd_flags; /* flag bits, see below */
LocationIndex pd_lower; /* offset to start of free space */
LocationIndex pd_upper; /* offset to end of free space */
LocationIndex pd_special; /* offset to start of special space */
uint16 pd_pagesize_version;
ShortTransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
ItemIdData pd_linp[FLEXIBLE_ARRAY_MEMBER]; /* beginning of line pointer array */
} PageHeaderData;


typedef struct {
/* XXX LSN is member of *any* block, not only page-organized ones */
PageXLogRecPtr pd_lsn; /* LSN: next byte after last byte of xlog
* record for last change to this page */
uint16 pd_checksum; /* checksum */
uint16 pd_flags; /* flag bits, see below */
LocationIndex pd_lower; /* offset to start of free space */
LocationIndex pd_upper; /* offset to end of free space */
LocationIndex pd_special; /* offset to start of special space */
uint16 pd_pagesize_version;
ShortTransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
TransactionId pd_xid_base; /* base value for transaction IDs on page */
TransactionId pd_multi_base; /* base value for multixact IDs on page */
ItemIdData pd_linp[FLEXIBLE_ARRAY_MEMBER]; /* beginning of line pointer array */
} HeapPageHeaderData;

元组

元组包括元组头和数据。

t_xmin:代表插入此元组的事务xid;

t_xmax:代表更新或者删除此元组的事务xid,如果该元组插入后未进行更新或者删除,t_xmax=0;

t_cid:command id,代表在当前事务中,已经执行过多少条sql,例如执行第一条sql时cid=0,执行第二条sql时cid=1;

t_ctid:保存着指向自身或者新元组的元组标识(tid),由两个数字组成,第一个数字代表物理块号,或者叫页面号,第二个数字代表元组号。(如果是旧版本数据,则此字段指向新版本的数据)。

t_infomask2:使用其低11位表示当前元组的属性个数,其他位用于包含用于HOT技术及元组可见性的标志位。

t_infomask:标识元组的当前状态,比如:是否具有OID、是否有空属性等,t_infomask的每一位对应不同的状态,共16种状态。

t_hoff:表示元组头的大小。

t_bits:用于标识该元组哪些字段为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct HeapTupleFields
{
TransactionId t_xmin; /* inserting xact ID */
TransactionId t_xmax; /* deleting or locking xact ID */

union
{
CommandId t_cid; /* inserting or deleting command ID, or both */
TransactionId t_xvac; /* old-style VACUUM FULL xact ID */
} t_field3;
} HeapTupleFields;

typedef struct HeapTupleHeaderData {
union {
HeapTupleFields t_heap;
DatumTupleFields t_datum;
} t_choice;

ItemPointerData t_ctid; /* current TID of this or newer tuple */
/* Fields below here must match MinimalTupleData! */
uint16 t_infomask2; /* number of attributes + various flags */
uint16 t_infomask; /* various flag bits, see below */
uint8 t_hoff; /* sizeof header incl. bitmap, padding */
/* ^ - 23 bytes - ^ */
bits8 t_bits[FLEXIBLE_ARRAY_MEMBER]; /* bitmap of NULLs -- VARIABLE LENGTH */
/* MORE DATA FOLLOWS AT END OF STRUCT */
} HeapTupleHeaderData;

仔细看ItemPointerData的结构:

  • ip_blkid:表示文件块编号。
  • ip_posid:表示该元组对应的ItemIdData数组的下标(ItemIdData数组见PageHeaderData)。
1
2
3
4
5
typedef struct ItemPointerData
{
BlockIdData ip_blkid;
OffsetNumber ip_posid;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef struct HeapTupleData {
uint32 t_len; /* length of *t_data */
uint1 tupTableType = HEAP_TUPLE;
uint1 tupInfo;
int2 t_bucketId;
ItemPointerData t_self; /* SelfItemPointer */
Oid t_tableOid; /* table the tuple came from */
TransactionId t_xid_base;
TransactionId t_multi_base;
#ifdef PGXC
uint32 t_xc_node_id; /* Data node the tuple came from */
#endif
Page page = nullptr;
Relation rel = nullptr;
HeapTupleHeader t_data; /* -> tuple header and data */
} HeapTupleData;

typedef void* Tuple;

表操作基本原理

插入

插入的过程很简单,修改对应Linp和tuple数组。值得注意的是Tuple头部各个字段的设置:

  • t_xmin会被设置为执行当前命令的事务id
  • t_xmax设置为0,t_cid则是根据事务内的命令数设置
  • t_ctid设置为块号和位置

上图中的tuple1的ctid应该是(0,1)。

1
SELECT lp as tuple, t_xmin, t_xmax, t_field3 as t_cid, t_ctid FROM heap_page_items(get_raw_page(tbl, 0));

删除

astore的删除逻辑为标记删除,不删除元组本身,而是作标记。元组的回收由vacuum进程进行。删除时,元组头部设置:

  • t_xmax设置为事务id

更新

astore存储格式为追加写优化设计,当一个更新操作将v0版本元组更新为v1版本元组之后,如果v0版本元组所在页面仍然有空闲空间,则直接在该页面内插入更新后的v1版本元组,并将v0版本的元组指针指向v1版本的元组指针。

在这个过程中,新版本元组以追加写的方式和被更新的老版本元组混合存放,可减少更新操作的I/O开销。然而混合存放使得清理老版本元组开销较大。

更新操作执行的是先删除后插入,缺点显然易见:会造成数据表膨胀。在有索引的情况下,即便没有修改索引列的值,也会往索引中插入一条数据,PG为改善这种情况,采用了一种HOT技术。

关于HOT技术,在索引相关文章中再详细阐述。

下图中两次更新同一行,可以看到,更新的操作为:

  • 针对旧元组,设置它的t_xmax为事务id,t_ctid指向新元组。
  • 针对新元组,正常执行插入操作。

单表查询

关于查询涉及很多方面:SQL解析、成本评估、索引选择等等,这里只讨论最简单的不涉及索引的单表顺序查询的过程。

下面是执行查询时的调用链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
main
PostmasterMain
ServerLoop
BackendStartup
BackendRun
PostgresMain
exec_simple_query # 词法解析/语法解析/优化器
PortalRun # 已经生产执行计划,开始执行
PortalRunSelect # 执行计划是 查询,这里和 insert的执行计划是不一样的
standard_ExecutorRun
ExecutePlan
ExecProcNode
ExecScan
ExecScanFetch
SeqNext

主要看调用顺序:

1
2
3
4
ExecutePlan
ExecProcNode
ExecSeqScan
ExecScan

从ExecutePlan上可以看到该函数执行查询计划,直到达到指定数量numberTuples的元组。

该函数也是相当易懂:主体为一个循环,一次通过ExecProcNode获取一条tuple存储在slot中,如果sendTuple就发送到终端,增加current_tuple_count计数,直到numberTuples。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/* ----------------------------------------------------------------
* ExecutePlan
*
* Processes the query plan until we have retrieved 'numberTuples' tuples,
* moving in the specified direction.
*
* Runs to completion if numberTuples is 0
*
* Note: the ctid attribute is a 'junk' attribute that is removed before the
* user can see it
* ----------------------------------------------------------------
*/
static void ExecutePlan(EState *estate,
PlanState *planstate,
bool use_parallel_mode,
CmdType operation,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
DestReceiver *dest,
bool execute_once)
{
TupleTableSlot *slot;
uint64 current_tuple_count;

/*
* initialize local variables
*/
current_tuple_count = 0;

/*
* Set the direction.
*/
estate->es_direction = direction;

/*
* If the plan might potentially be executed multiple times, we must force
* it to run without parallelism, because we might exit early. Also
* disable parallelism when writing into a relation, because no database
* changes are allowed in parallel mode.
*/
if (!execute_once || dest->mydest == DestIntoRel)
use_parallel_mode = false;

estate->es_use_parallel_mode = use_parallel_mode;
if (use_parallel_mode)
EnterParallelMode();

/*
* Loop until we've processed the proper number of tuples from the plan.
*/
for (;;)
{
/* Reset the per-output-tuple exprcontext */
ResetPerTupleExprContext(estate);

/*
* Execute the plan and obtain a tuple
*/
slot = ExecProcNode(planstate);

/*
* if the tuple is null, then we assume there is nothing more to
* process so we just end the loop...
*/
if (TupIsNull(slot))
{
/* Allow nodes to release or shut down resources. */
(void) ExecShutdownNode(planstate);
break;
}

/*
* If we have a junk filter, then project a new tuple with the junk
* removed.
*
* Store this new "clean" tuple in the junkfilter's resultSlot.
* (Formerly, we stored it back over the "dirty" tuple, which is WRONG
* because that tuple slot has the wrong descriptor.)
*/
if (estate->es_junkFilter != NULL)
slot = ExecFilterJunk(estate->es_junkFilter, slot);

/*
* If we are supposed to send the tuple somewhere, do so. (In
* practice, this is probably always the case at this point.)
*/
if (sendTuples)
{
/*
* If we are not able to send the tuple, we assume the destination
* has closed and no more tuples can be sent. If that's the case,
* end the loop.
*/
if (!((*dest->receiveSlot) (slot, dest)))
break;
}

/*
* Count tuples processed, if this is a SELECT. (For other operation
* types, the ModifyTable plan node must count the appropriate
* events.)
*/
if (operation == CMD_SELECT)
(estate->es_processed)++;

/*
* check our tuple count.. if we've processed the proper number then
* quit, else loop again and process more tuples. Zero numberTuples
* means no limit.
*/
current_tuple_count++;
if (numberTuples && numberTuples == current_tuple_count)
{
/* Allow nodes to release or shut down resources. */
(void) ExecShutdownNode(planstate);
break;
}
}

if (use_parallel_mode)
ExitParallelMode();
}

ExecProcNode会调用ExecSeqScan,ExecSeqScan又是对ExecScan的包装,下面重点看ExecScan,ExecScan会根据传入的扫描方式进行扫描,这样的好处是抽象。索引扫描也是执行ExecScan,只不过传入索引扫描的方法。

1
2
3
4
5
TupleTableSlot * ExecSeqScan(SeqScanState *node){
return ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
}

ExecScan的主体仍然为一个循环,通过传入ExecScanAccessMtd(也就是SeqNext)获取一条Tuple,通过ExecQuel校验元组是否满足条件。这里的校验以后再谈。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
TupleTableSlot *
ExecScan(ScanState *node,
ExecScanAccessMtd accessMtd, /* function returning a tuple */
ExecScanRecheckMtd recheckMtd)
{
ExprContext *econtext;
ExprState *qual;
ProjectionInfo *projInfo;

/*
* Fetch data from node
*/
qual = node->ps.qual;
projInfo = node->ps.ps_ProjInfo;
econtext = node->ps.ps_ExprContext;

/* interrupt checks are in ExecScanFetch */

/*
* If we have neither a qual to check nor a projection to do, just skip
* all the overhead and return the raw scan tuple.
*/
if (!qual && !projInfo)
{
ResetExprContext(econtext);
return ExecScanFetch(node, accessMtd, recheckMtd);
}

/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle.
*/
ResetExprContext(econtext);

/*
* get a tuple from the access method. Loop until we obtain a tuple that
* passes the qualification.
*/
for (;;)
{
TupleTableSlot *slot;

slot = ExecScanFetch(node, accessMtd, recheckMtd);

/*
* if the slot returned by the accessMtd contains NULL, then it means
* there is nothing more to scan so we just return an empty slot,
* being careful to use the projection result slot so it has correct
* tupleDesc.
*/
if (TupIsNull(slot))
{
if (projInfo)
return ExecClearTuple(projInfo->pi_state.resultslot);
else
return slot;
}

/*
* place the current tuple into the expr context
*/
econtext->ecxt_scantuple = slot;

/*
* check that the current tuple satisfies the qual-clause
*
* check for non-null qual here to avoid a function call to ExecQual()
* when the qual is null ... saves only a few cycles, but they add up
* ...
*/
if (qual == NULL || ExecQual(qual, econtext))
{
/*
* Found a satisfactory scan tuple.
*/
if (projInfo)
{
/*
* Form a projection tuple, store it in the result tuple slot
* and return it.
*/
return ExecProject(projInfo);
}
else
{
/*
* Here, we aren't projecting, so just return scan tuple.
*/
return slot;
}
}
else
InstrCountFiltered1(node, 1);

/*
* Tuple fails qual, so free per-tuple memory and try again.
*/
ResetExprContext(econtext);
}
}

static inline TupleTableSlot * ExecScanFetch(ScanState *node,
ExecScanAccessMtd accessMtd,
ExecScanRecheckMtd recheckMtd)
{
EState *estate = node->ps.state;

CHECK_FOR_INTERRUPTS();

if (estate->es_epqTuple != NULL)
{
/*
* We are inside an EvalPlanQual recheck. Return the test tuple if
* one is available, after rechecking any access-method-specific
* conditions.
*/
Index scanrelid = ((Scan *) node->ps.plan)->scanrelid;

if (scanrelid == 0)
{
TupleTableSlot *slot = node->ss_ScanTupleSlot;

/*
* This is a ForeignScan or CustomScan which has pushed down a
* join to the remote side. The recheck method is responsible not
* only for rechecking the scan/join quals but also for storing
* the correct tuple in the slot.
*/
if (!(*recheckMtd) (node, slot))
ExecClearTuple(slot); /* would not be returned by scan */
return slot;
}
else if (estate->es_epqTupleSet[scanrelid - 1])
{
TupleTableSlot *slot = node->ss_ScanTupleSlot;

/* Return empty slot if we already returned a tuple */
if (estate->es_epqScanDone[scanrelid - 1])
return ExecClearTuple(slot);
/* Else mark to remember that we shouldn't return more */
estate->es_epqScanDone[scanrelid - 1] = true;

/* Return empty slot if we haven't got a test tuple */
if (estate->es_epqTuple[scanrelid - 1] == NULL)
return ExecClearTuple(slot);

/* Store test tuple in the plan node's scan slot */
ExecStoreTuple(estate->es_epqTuple[scanrelid - 1],
slot, InvalidBuffer, false);

/* Check if it meets the access-method conditions */
if (!(*recheckMtd) (node, slot))
ExecClearTuple(slot); /* would not be returned by scan */

return slot;
}
}

/*
* Run the node-type-specific access method function to get the next tuple
*/
return (*accessMtd) (node);
}

SeqNext

参数解释

SeqNext的一个重要参数是SeqScanState顺序扫描状态,其中重要的是HeapScanDesc堆扫描描述符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct ScanState
{
PlanState ps; /* its first field is NodeTag */
Relation ss_currentRelation;
HeapScanDesc ss_currentScanDesc;
TupleTableSlot *ss_ScanTupleSlot;
} ScanState;

/* ----------------
* SeqScanState information
* ----------------
*/
typedef struct SeqScanState
{
ScanState ss; /* its first field is NodeTag */
Size pscan_len; /* size of parallel heap scan descriptor */
} SeqScanState;

HeapScanDescData分为四个部分,第一部分是扫描参数;第二部分是初始化结构体相关;第三部分是扫描状态;第四部分与page-at-a-time扫描模式相关。

重点看第二部分和第三部分,它们保存了扫描了扫描的状态,比如当前扫描的块,元组,缓冲页等。由于一次只返回一条元组,所以需要它来记录当前遍历到哪个缓冲页的哪一条元组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
typedef struct HeapScanDescData
{
/* scan parameters */
Relation rs_rd; /* 堆表描述符;heap relation descriptor */
Snapshot rs_snapshot; /* 快照;snapshot to see */
int rs_nkeys; /* 扫描键数;number of scan keys */
ScanKey rs_key; /* 扫描键数组;array of scan key descriptors */
bool rs_bitmapscan; /* bitmap scan=>T;true if this is really a bitmap scan */
bool rs_samplescan; /* sample scan=>T;true if this is really a sample scan */
bool rs_pageatatime; /* 是否验证可见性(MVCC机制);verify visibility page-at-a-time? */
bool rs_allow_strat; /* 是否允许访问策略的使用;allow or disallow use of access strategy */
bool rs_allow_sync; /* 是否允许syncscan的使用;allow or disallow use of syncscan */
bool rs_temp_snap; /* 是否在扫描结束后取消快照"登记";unregister snapshot at scan end? */

/* state set up at initscan time */
BlockNumber rs_nblocks; /* rel中的blocks总数;total number of blocks in rel */
BlockNumber rs_startblock; /* 开始的block编号;block # to start at */
BlockNumber rs_numblocks; /* 最大的block编号;max number of blocks to scan */
/* rs_numblocks is usually InvalidBlockNumber, meaning "scan whole rel" */
BufferAccessStrategy rs_strategy; /* 读取时的访问场景;access strategy for reads */
bool rs_syncscan; /* 在syncscan逻辑处理时是否报告位置;report location to syncscan logic? */

/* scan current state */
bool rs_inited; /* 如为F,则扫描尚未初始化;false = scan not init'd yet */
HeapTupleData rs_ctup; /* 当前扫描的tuple;current tuple in scan, if any */
BlockNumber rs_cblock; /* 当前扫描的block;current block # in scan, if any */
Buffer rs_cbuf; /* 当前扫描的buffer;current buffer in scan, if any */
/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
ParallelHeapScanDesc rs_parallel; /* 并行扫描信息;parallel scan information */
/* these fields only used in page-at-a-time mode and for bitmap scans */
int rs_cindex; /* 在vistuples中的当前元组索引;current tuple's index in vistuples */
int rs_ntuples; /* page中的可见元组计数;number of visible tuples on page */
OffsetNumber rs_vistuples[MaxHeapTuplesPerPage]; /* 元组的偏移;their offsets */
} HeapScanDescData;

/* struct definitions appear in relscan.h */
typedef struct HeapScanDescData *HeapScanDesc;

SeqNext过程

SeqNext逻辑也比较简单:

  • 调用heap_beginscan初始化scandesc扫描
  • 调用heap_getnext获取一条元组
  • 调用ExecStoreTuple或ExecClearTuple执行一些清理操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static TupleTableSlot * SeqNext(SeqScanState *node){
HeapTuple tuple;
HeapScanDesc scandesc;
EState *estate;
ScanDirection direction;
TupleTableSlot *slot;

/*
* get information from the estate and scan state
*/
scandesc = node->ss.ss_currentScanDesc;
estate = node->ss.ps.state;
direction = estate->es_direction;
slot = node->ss.ss_ScanTupleSlot;

/* 如果不存在scandesc,就创建一个scandesc */
if (scandesc == NULL)
{
/*
* We reach here if the scan is not parallel, or if we're serially
* executing a scan that was planned to be parallel.
*/
scandesc = heap_beginscan(node->ss.ss_currentRelation,
estate->es_snapshot,
0, NULL);
node->ss.ss_currentScanDesc = scandesc;
}

/*
* get the next tuple from the table
* 获取一条可见元组
*/
tuple = heap_getnext(scandesc, direction);

/*
* save the tuple and the buffer returned to us by the access methods in
* our scan tuple slot and return the slot. Note: we pass 'false' because
* tuples returned by heap_getnext() are pointers onto disk pages and were
* not created with palloc() and so should not be pfree()'d. Note also
* that ExecStoreTuple will increment the refcount of the buffer; the
* refcount will not be dropped until the tuple table slot is cleared.
* 一些清理工作,比如对buffer page执行unpin操作。
*/
if (tuple)
ExecStoreTuple(tuple, /* tuple to store */
slot, /* slot to store in */
scandesc->rs_cbuf, /* buffer associated with this
* tuple */
false); /* don't pfree this pointer */
else
ExecClearTuple(slot);

return slot;
}

heap_beginscan

heap_beginscan调用heap_beginscan_internal设置扫描结构体HeapScanDesc。

heap_beginscan_internal 函数的处理流程如下:

  • 首先根据传递给函数的参数对HeapScanDesoData 的相关扫描参数进行设置,包括扫描的关系表、快照、扫描键、rS_allow_strat 、rs_allow- syne 等信息。
  • 调用函数initscan 对该结构体第二部分字段进行初始化。

heap_getnext

这里注意到根据rs_pageatatime有两种不同的扫描模式,page-at-a-time是一种轻量级的扫描模式,它不对buffer进行加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
{
/* Note: no locking manipulations needed */

HEAPDEBUG_1; /* heap_getnext( info ) */

if (scan->rs_pageatatime)
heapgettup_pagemode(scan, direction, scan->rs_nkeys, scan->rs_key);
else
heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key);

if (scan->rs_ctup.t_data == NULL)
{
HEAPDEBUG_2; /* heap_getnext returning EOS */
return NULL;
}

/*
* if we get here it means we have a new current scan tuple, so point to
* the proper return buffer and return the tuple.
*/
HEAPDEBUG_3; /* heap_getnext returning tuple */

pgstat_count_heap_getnext(scan->rs_rd);

return &(scan->rs_ctup);
}

这里要注意HeapScanDesc几个成员:

  • rs_rd字段表示当前表

  • rs_snapshot表示扫描快照

  • rs_startblock表示了扫描的开始块

  • rs_nblocks是表中的tuple总数

  • rs_numblocks是可供扫描的块个数

  • rs_startblock是开始的块号

  • rs_cblock是当前缓存块

  • rs_cindex当前遍历的tuple下标

heapgettup或者heapgettup_pagemode的逻辑差不多,都是首先判断scandesc是否初始化,未初始则调用heapgetpage获取一页的tuple。然后遍历这一页的tuple,将其放到rs_ctup中。

heapgetpage将获取所有可见元组的ItemId放在rs_vsisituples中,注意,heapgetpage还会调用heap_page_prune_opt来清理HOT链。

heapgetpage的流程如下:

  1. 将物理块加载到缓存页中。

  2. 为缓存页加轻量级共享锁

    轻量级锁,即light wight lock,是PostgreSQL自己实现的一种锁,本质上是latch,用于共享资源的多进程同步,具备等待队列,不具备死锁检测。

  3. 遍历页面中的所有元组,判断元组的可见性,将可见元组的ItemId存入rs_vistuples中。

  4. 释放轻量级共享锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
heapgettup
if (!scan->rs_base.rs_inited) {
heapgetpage((TableScanDesc)scan, page);
} else {
/* continue from previously returned page/tuple */
page = scan->rs_base.rs_cblock; /* current page */
line_off = OffsetNumberNext(ItemPointerGetOffsetNumber(&(tuple->t_self)));
}

lpp = HeapPageGetItemId(dp, line_off);
for (;;) {
while (lines_left > 0) {
...
}
}

堆表访存管理

以astore堆表顺序扫描为例,执行流程如下。

(1)调用heap_open接口打开待扫描的堆表,获取表的相关元信息,如表的行存储子格式为astore格式等。该步通常要获取AccessShare一级表锁,防止并发的DDL操作。

(2)调用tableam_scan_begin接口,从g_tableam_routines数组中找到astore的初始化扫描接口,即heap_beginscan接口,完成初始化顺序扫描操作相关的结构体。

(3)循环调用tableam_scan_getnexttuple接口,从g_tableam_routines数组中找到astore的扫描元组接口,即heap_getnext接口,顺序获取一条astore元组,直到完成全部扫描。顺序扫描时,每次先获取下一个页面,然后依次返回该页面上的每一条元组。这里提供了两种元组可见性的判断时机:

  • heapgettup_pagemode。在第一次加载下一个页面时,加上页面共享锁,完成对页面上所有元组的可见性判断,然后将可见的元组位置保存起来,释放页面共享锁。后面每次直接从保存的可见性元组列表中返回下一条可见的元组,无须再对页面加共享,使用快照的查询,默认都使用该批量模式,因为元组的可见性在同一个快照中不会再发生变化。
  • heapgetpage。除了第一次加载下一个页面时需要批量校验元组可见性之外,在后面每一次返回该页面下一条元组时,都要重新对页面加共享锁,判断下一条元组的可见性。该模式的查询性能较批量模式要稍低,适用于对系统表的顺序扫描(系统表的可见性不参照查询快照,而是以实时的事务提交状态为准)。

(4)调用tableam_scan_end接口,从g_tableam_routines数组中找到astore的扫描结束接口,即heap_endscan接口,结束顺序扫描操作,释放对应的扫描结构体。

(5)调用heap_close接口,释放对表加的锁或引用计数。

参考:

https://blog.csdn.net/obvious__/article/details/120706222

https://pg-internal.vonng.com/#/

作者

Desirer

发布于

2024-10-03

更新于

2024-11-15

许可协议