PG存储astore解析二

结合代码流程解析插入、删除流程。

插入流程

调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
main
PostmasterMain
ServerLoop
BackendStartup
BackendRun
PostgresMain
exec_simple_query # 词法解析/语法解析/优化器
PortalRun
PortalRunMulti
ProcessQuery
standard_ExecutorRun
ExecutePlan
ExecProcNode
ExecModifyTable
ExecInsert
table_tuple_insert # 通过 default_table_access_method选择执行函数
heapam_tuple_insert # 选择了 heap access method.
heap_insert # 插入元组

insert into t1 values(4,4); 其中exec_simple_query的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
exec_simple_query
--> pg_parse_query //语法解析
--> raw_parser
--> base_yyparse
--> pg_analyze_and_rewrite //语义分析
--> parse_analyze
--> transformStmt
--> transformInsertStmt

--> pg_plan_queries //查询优化,生成执行计划
--> pg_plan_query
--> standard_planner
--> subquery_planner
--> grouping_planner
--> query_planner
--> build_simple_rel
--> make_one_rel
--> create_modifytable_path
--> create_plan
--> PortalStart
--> PortalRun
--> ProcessQuery
--> ExecutorStart
--> InitPlan
--> ExecInitModifyTable
--> ExecutorRun
--> ExecutePlan
--> ExecModifyTable
--> planSlot = ExecProcNode(subplanstate); // 执行子执行计划Result,获取要插入的tuple值, 对应values(4,4)
--> ExecInitInsertProjection
--> ExecGetInsertNewTuple
--> ExecInsert // -----执行插入------
--> table_tuple_insert
--> ExecutorEnd
--> PortalDrop

ExecInsert

heap_insert主要是四个步骤:

  1. 从values形成要插入的tuple heap_prepare_insert
  2. 从Buffer中获取一个页 RelationGetBufferForTuple
  3. 向缓冲页插入tuple,标记脏页 RelationPutHeapTuple
  4. 写WAL日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ExecInsert  // 执行插入
--> table_tuple_insert
--> heap_insert
/******** Buffer中向Page插入一条tuple ***********/
--> GetCurrentTransactionId()
--> heap_prepare_insert // 1.准备tuple
--> RelationGetBufferForTuple // 2. Buffer中获取足够插入tuple大小的页,
--> GetPageWithFreeSpace
--> fsm_search // 结合fsm,查找含有足够空闲size的页
--> CheckForSerializableConflictIn
--> RelationPutHeapTuple // 3. 向页中插入tuple
--> BufferGetPage(buffer);
--> PageAddItemExtended
--> MarkBufferDirty
/******** 4.写WAL日志 ***********/
--> XLogBeginInsert
--> XLogRegisterData
--> XLogRegisterBuffer
--> XLogRegisterBufData
--> XLogSetRecordFlags
--> XLogInsert
--> XLogRecordAssemble // 由前面的信息生成日志记录
--> XLogInsertRecord // 插入WAL日志中
--> CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,StartPos, EndPos);
--> GetXLogBuffer(CurrPos)
--> XLogFlush(EndPos)
--> XLogWrite // 写入WAL日志文件

--> PageSetLSN(page, recptr);

heap_prepare_insert的主要作用是设置Tuple元组头部信息,包括oid、xmin、xmax、cid等等,如果Tuple可以Toast则执行toast_insert_or_update。

RelationGetBufferForTuple:

  1. 计算元组需要预留的大小,加上元组大小不能超过元组最大大小
  2. 要插入的页尽可能是刚插入过的页,没有的话就从fsm上获取一页,如果没有fsm信息或fsm不够大,就获取关系表的最后一页
  3. 为获取的页加锁
  4. 检查是否pin和lock成功了这些页,并保证成功(pin操作是在vm里)
  5. 检查空间是否足够,如果空间够了,就返回这个buffer,如果空间不够,就重新获取
  6. 如果fsm中都不够用,就扩展表
  7. 对于扩展后的表,要将页初始化,再返回这个buffer

RelationPutHeapTuple:

  1. 根据 buffer获取Page
  2. 执行 pageAddItemExtend
  3. 更新 元组指针 itemid 的 ctid

PageAddItemExtended

主要是四步:

  1. 如果指定offsetNumber,则在指定偏移处覆盖数据,返回
  2. 否则找到空闲的Linp(或slot)
  3. 如果没有空闲的Linp则在其后找一个位置
  4. 设置item指向upper、拷贝数据到upper、更新lower和upper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
OffsetNumber
PageAddItemExtended(Page page,
Item item,
Size size,
OffsetNumber offsetNumber,
int flags)
{
PageHeader phdr = (PageHeader) page;
Size alignedSize;
int lower;
int upper;
ItemId itemId;
OffsetNumber limit;
bool needshuffle = false;

/*
* Be wary about corrupted page pointers
*/
if (phdr->pd_lower < SizeOfPageHeaderData ||
phdr->pd_lower > phdr->pd_upper ||
phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ)
ereport(PANIC,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));

/*
* Select offsetNumber to place the new item at
*/
limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));

/* was offsetNumber passed in? */
if (OffsetNumberIsValid(offsetNumber))
{
/* yes, check it */
if ((flags & PAI_OVERWRITE) != 0)
{
if (offsetNumber < limit)
{
itemId = PageGetItemId(phdr, offsetNumber);
if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
{
elog(WARNING, "will not overwrite a used ItemId");
return InvalidOffsetNumber;
}
}
}
else
{
if (offsetNumber < limit)
needshuffle = true; /* need to move existing linp's */
}
}
else
{
/* offsetNumber was not passed in, so find a free slot */
/* if no free slot, we'll put it at limit (1st open slot) */
if (PageHasFreeLinePointers(phdr))
{
/*
* Look for "recyclable" (unused) ItemId. We check for no storage
* as well, just to be paranoid --- unused items should never have
* storage.
*/
for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
{
itemId = PageGetItemId(phdr, offsetNumber);
if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
break;
}
if (offsetNumber >= limit)
{
/* the hint is wrong, so reset it */
PageClearHasFreeLinePointers(phdr);
}
}
else
{
/* don't bother searching if hint says there's no free slot */
offsetNumber = limit;
}
}

/* Reject placing items beyond the first unused line pointer */
if (offsetNumber > limit)
{
elog(WARNING, "specified item offset is too large");
return InvalidOffsetNumber;
}

/* Reject placing items beyond heap boundary, if heap */
if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
{
elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
return InvalidOffsetNumber;
}

/*
* Compute new lower and upper pointers for page, see if it'll fit.
*
* Note: do arithmetic as signed ints, to avoid mistakes if, say,
* alignedSize > pd_upper.
*/
if (offsetNumber == limit || needshuffle)
lower = phdr->pd_lower + sizeof(ItemIdData);
else
lower = phdr->pd_lower;

alignedSize = MAXALIGN(size);

upper = (int) phdr->pd_upper - (int) alignedSize;

if (lower > upper)
return InvalidOffsetNumber;

/*
* OK to insert the item. First, shuffle the existing pointers if needed.
*/
itemId = PageGetItemId(phdr, offsetNumber);

if (needshuffle)
memmove(itemId + 1, itemId,
(limit - offsetNumber) * sizeof(ItemIdData));

/* set the item pointer */
ItemIdSetNormal(itemId, upper, size);

/*
* Items normally contain no uninitialized bytes. Core bufpage consumers
* conform, but this is not a necessary coding rule; a new index AM could
* opt to depart from it. However, data type input functions and other
* C-language functions that synthesize datums should initialize all
* bytes; datumIsEqual() relies on this. Testing here, along with the
* similar check in printtup(), helps to catch such mistakes.
*
* Values of the "name" type retrieved via index-only scans may contain
* uninitialized bytes; see comment in btrescan(). Valgrind will report
* this as an error, but it is safe to ignore.
*/
VALGRIND_CHECK_MEM_IS_DEFINED(item, size);

/* copy the item's data onto the page */
memcpy((char *) page + upper, item, size);

/* adjust page header */
phdr->pd_lower = (LocationIndex) lower;
phdr->pd_upper = (LocationIndex) upper;

return offsetNumber;
}

删除流程

这里贴上博客的内容,写得很好,不重新造轮子。

1
2
3
4
heap_delete
buffer = ReadBuffer(relation, block);
// 并发更新判断元组的状态
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer, allow_delete_self);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
/*
* heap_delete - delete a tuple
*
* See table_tuple_delete() for an explanation of the parameters, except that
* this routine directly takes a tuple rather than a slot.
*
* In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
* t_xmax (resolving a possible MultiXact, if necessary), and t_cmax (the last
* only for TM_SelfModified, since we cannot obtain cmax from a combo CID
* generated by another transaction).
*/
TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, bool wait,
TM_FailureData *tmfd, bool changingPart)
{
TM_Result result;
TransactionId xid = GetCurrentTransactionId(); // 写操作事务均需要获取事务号
ItemId lp;
HeapTupleData tp;
Page page;
BlockNumber block;
Buffer buffer;
Buffer vmbuffer = InvalidBuffer;
TransactionId new_xmax;
uint16 new_infomask,
new_infomask2;
bool have_tuple_lock = false;
bool iscombo;
bool all_visible_cleared = false;
HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */
bool old_key_copied = false;

Assert(ItemPointerIsValid(tid)); // 删除元组的 tid上层函数传入

/*
* Forbid this during a parallel operation, lest it allocate a combo CID.
* Other workers might need that combo CID for visibility checks, and we
* have no provision for broadcasting it to them.
*/
// 删除一条元组禁止开并行模式
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot delete tuples during a parallel operation")));

// 根据元组的 tid确定块号,并根据块号和 relation 描述符将数据块加载时共享缓冲区中buffer,获取页地址
block = ItemPointerGetBlockNumber(tid);
buffer = ReadBuffer(relation, block);
page = BufferGetPage(buffer);

/*
* Before locking the buffer, pin the visibility map page if it appears to
* be necessary. Since we haven't got the lock yet, someone else might be
* in the middle of changing this, so we'll need to recheck after we have
* the lock.
*/
// 由于删除元组,势必会修改VM对应标识位,如果数据页含有 allvisible标记,则需要将数据页对应的VM页加载至
// vmbuffer(pin住)
if (PageIsAllVisible(page))
visibilitymap_pin(relation, block, &vmbuffer);

// 对 buffer施加排他锁
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

/*
* If we didn't pin the visibility map page and the page has become all
* visible while we were busy locking the buffer, we'll have to unlock and
* re-lock, to avoid holding the buffer lock across an I/O. That's a bit
* unfortunate, but hopefully shouldn't happen often.
*/
// 在获取上述缓冲块排他锁期间,可能有其他进程将对应的VM 数据页标记信息更新为 allvisible,那么此时
// 需要释放buffer 排他锁,pin住 vmbuffer,后在此获取buffer排他锁。,其目的是为了防止在持有buffer
// 排他锁行执行 io (加载vm page 至 vmbuffer)
if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
visibilitymap_pin(relation, block, &vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}

// 获取偏移量为tid的元组相关信息
lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
Assert(ItemIdIsNormal(lp));

tp.t_tableOid = RelationGetRelid(relation);
tp.t_data = (HeapTupleHeader) PageGetItem(page, lp);
tp.t_len = ItemIdGetLength(lp);
tp.t_self = *tid;

l1:
// 可见性判断:根据其结果判断元组是否满足被更新/删除
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);

// 元组不可见,报错
if (result == TM_Invisible)
{
UnlockReleaseBuffer(buffer);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("attempted to delete invisible tuple")));
}
// 该元组正在被更新,且等待
else if (result == TM_BeingModified && wait)
{
TransactionId xwait;
uint16 infomask;

/* must copy state data before unlocking buffer */
// 获取 元组的xmax和 infomask,此时并不知道 xmax是单纯的事务号还是 MultiXactId
xwait = HeapTupleHeaderGetRawXmax(tp.t_data);
infomask = tp.t_data->t_infomask;

/*
* Sleep until concurrent transaction ends -- except when there's a
* single locker and it's our own transaction. Note we don't care
* which lock mode the locker has, because we need the strongest one.
*
* Before sleeping, we need to acquire tuple lock to establish our
* priority for the tuple (see heap_lock_tuple). LockTuple will
* release us when we are next-in-line for the tuple.
*
* If we are forced to "start over" below, we keep the tuple lock;
* this arranges that we stay at the head of the line while rechecking
* tuple state.
*/
// 如果是 MultiXactId
if (infomask & HEAP_XMAX_IS_MULTI)
{
bool current_is_member = false;

// 判断 MultiXactId中保存的锁模式是否与 LockTupleExclusive冲突,冲突则
if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
LockTupleExclusive, &current_is_member))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

/*
* Acquire the lock, if necessary (but skip it when we're
* requesting a lock and already have one; avoids deadlock).
*/
// 如果当前事务不属于MultiXactId成员,则需获取元组级常规锁,反之无需获取,
// 其目的是避免死锁
if (!current_is_member)
heap_acquire_tuplock(relation, &(tp.t_self), LockTupleExclusive,
LockWaitBlock, &have_tuple_lock);

/* wait for multixact */
// 等待冲突的事务完成
MultiXactIdWait((MultiXactId) xwait, MultiXactStatusUpdate, infomask,
relation, &(tp.t_self), XLTW_Delete,
NULL);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

/*
* If xwait had just locked the tuple then some other xact
* could update this tuple before we get to this point. Check
* for xmax change, and start over if so.
*/
// 如果元组的 infomask被其他事务更新,则需重新进行上述操作
if (xmax_infomask_changed(tp.t_data->t_infomask, infomask) ||
!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tp.t_data),
xwait))
goto l1;
}

/*
* You might think the multixact is necessarily done here, but not
* so: it could have surviving members, namely our own xact or
* other subxacts of this backend. It is legal for us to delete
* the tuple in either case, however (the latter case is
* essentially a situation of upgrading our former shared lock to
* exclusive). We don't bother changing the on-disk hint bits
* since we are about to overwrite the xmax altogether.
*/
}
// xwait 是单一事务号,且不是当前事务号
else if (!TransactionIdIsCurrentTransactionId(xwait))
{
/*
* Wait for regular transaction to end; but first, acquire tuple
* lock.
*/
// 获取元组级常规锁,等待其他事务的更新操作
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
heap_acquire_tuplock(relation, &(tp.t_self), LockTupleExclusive,
LockWaitBlock, &have_tuple_lock);
XactLockTableWait(xwait, relation, &(tp.t_self), XLTW_Delete);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

/*
* xwait is done, but if xwait had just locked the tuple then some
* other xact could update this tuple before we get to this point.
* Check for xmax change, and start over if so.
*/
// 如果其他事务更新元组,则需回到 l1程序点进行相关操作
if (xmax_infomask_changed(tp.t_data->t_infomask, infomask) ||
!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tp.t_data),
xwait))
goto l1;

/* Otherwise check if it committed or aborted */
// 更新元组的 HintBit标识位信息
UpdateXmaxHintBits(tp.t_data, buffer, xwait);
}

/*
* We may overwrite if previous xmax aborted, or if it committed but
* only locked the tuple without updating it.
*/
// 如果 xmax意外终止或者提交但是被他人锁住(未更新)
if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
HEAP_XMAX_IS_LOCKED_ONLY(tp.t_data->t_infomask) ||
HeapTupleHeaderIsOnlyLocked(tp.t_data))
result = TM_Ok;
else if (!ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
result = TM_Updated;
else
result = TM_Deleted;
}

if (crosscheck != InvalidSnapshot && result == TM_Ok)
{
/* Perform additional check for transaction-snapshot mode RI updates */
if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
result = TM_Updated;
}

if (result != TM_Ok)
{
Assert(result == TM_SelfModified ||
result == TM_Updated ||
result == TM_Deleted ||
result == TM_BeingModified);
Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
Assert(result != TM_Updated ||
!ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid));
tmfd->ctid = tp.t_data->t_ctid;
tmfd->xmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
if (result == TM_SelfModified)
tmfd->cmax = HeapTupleHeaderGetCmax(tp.t_data);
else
tmfd->cmax = InvalidCommandId;
UnlockReleaseBuffer(buffer);
if (have_tuple_lock)
UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
return result;
}

/*
* We're about to do the actual delete -- check for conflict first, to
* avoid possibly having to roll back work we've just done.
*
* This is safe without a recheck as long as there is no possibility of
* another process scanning the page between this check and the delete
* being visible to the scan (i.e., an exclusive buffer content lock is
* continuously held from this point until the tuple delete is visible).
*/
// 序列化冲突检查
CheckForSerializableConflictIn(relation, tid, BufferGetBlockNumber(buffer));

/* replace cid with a combo CID if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);

/*
* Compute replica identity tuple before entering the critical section so
* we don't PANIC upon a memory allocation failure.
*/
old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);

/*
* If this is the first possibly-multixact-able operation in the current
* transaction, set my per-backend OldestMemberMXactId setting. We can be
* certain that the transaction will never become a member of any older
* MultiXactIds than that. (We have to do this even if we end up just
* using our own TransactionId below, since some other backend could
* incorporate our XID into a MultiXact immediately afterwards.)
*/
MultiXactIdSetOldestMember();
// 计算新的xmax + infomask + infomask2
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
tp.t_data->t_infomask, tp.t_data->t_infomask2,
xid, LockTupleExclusive, true,
&new_xmax, &new_infomask, &new_infomask2);

START_CRIT_SECTION();
// 临界区
/*
* If this transaction commits, the tuple will become DEAD sooner or
* later. Set flag that this page is a candidate for pruning once our xid
* falls below the OldestXmin horizon. If the transaction finally aborts,
* the subsequent page pruning will be a no-op and the hint will be
* cleared.
*/
PageSetPrunable(page, xid);

// 首先清除元组对应数据页 和 vm 页的all visible标识位
if (PageIsAllVisible(page))
{
all_visible_cleared = true;
PageClearAllVisible(page);
visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
vmbuffer, VISIBILITYMAP_VALID_BITS);
}
// 更新元组头等相关信息
/* store transaction information of xact deleting the tuple */
tp.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
tp.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
tp.t_data->t_infomask |= new_infomask;
tp.t_data->t_infomask2 |= new_infomask2;
HeapTupleHeaderClearHotUpdated(tp.t_data);
HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
/* Make sure there is no forward chain link in t_ctid */
tp.t_data->t_ctid = tp.t_self;

/* Signal that this is actually a move into another partition */
if (changingPart)
HeapTupleHeaderSetMovedPartitions(tp.t_data);

MarkBufferDirty(buffer);

/*
* XLOG stuff
*
* NB: heap_abort_speculative() uses the same xlog record and replay
* routines.
*/
// 写XLOG日志,奔溃恢复
if (RelationNeedsWAL(relation))
{
xl_heap_delete xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;

/*
* For logical decode we need combo CIDs to properly decode the
* catalog
*/
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, &tp);

xlrec.flags = 0;
if (all_visible_cleared)
xlrec.flags |= XLH_DELETE_ALL_VISIBLE_CLEARED;
if (changingPart)
xlrec.flags |= XLH_DELETE_IS_PARTITION_MOVE;
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
tp.t_data->t_infomask2);
xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
xlrec.xmax = new_xmax;

if (old_key_tuple != NULL)
{
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
else
xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
}

XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);

XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);

/*
* Log replica identity of the deleted tuple if there is one
*/
if (old_key_tuple != NULL)
{
xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;

XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
XLogRegisterData((char *) old_key_tuple->t_data
+ SizeofHeapTupleHeader,
old_key_tuple->t_len
- SizeofHeapTupleHeader);
}

/* filtering by origin on a row level is much more efficient */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);

recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);

PageSetLSN(page, recptr);
}

END_CRIT_SECTION();
// 退出临界区 + 释放锁+内存资源
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);

/*
* If the tuple has toasted out-of-line attributes, we need to delete
* those items too. We have to do this before releasing the buffer
* because we need to look at the contents of the tuple, but it's OK to
* release the content lock on the buffer first.
*/
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_MATVIEW)
{
/* toast table entries should never be recursively toasted */
Assert(!HeapTupleHasExternal(&tp));
}
else if (HeapTupleHasExternal(&tp))
heap_toast_delete(relation, &tp, false);

/*
* Mark tuple for invalidation from system caches at next command
* boundary. We have to do this before releasing the buffer because we
* need to look at the contents of the tuple.
*/
// 将删除元组在内存中对应的 cache 标记为无效 ==> 普通表元组不执行,系统表元组会标记
CacheInvalidateHeapTuple(relation, &tp, NULL);

/* Now we can release the buffer */
ReleaseBuffer(buffer);

/*
* Release the lmgr tuple lock, if we had it.
*/
if (have_tuple_lock)
UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);

pgstat_count_heap_delete(relation);

if (old_key_tuple != NULL && old_key_copied)
heap_freetuple(old_key_tuple);

return TM_Ok;
}

更新流程

写-写冲突采取的是两阶段锁协议。

整个事务分为两个阶段,前一个阶段为加锁,后一个阶段为解锁。在加锁阶段,事务只能加锁,也可以操作数据,但不能解锁,直到事务释放第一个锁,就进入解锁阶段,此过程中事务只能解锁,也可以操作数据,不能再加锁。两阶段锁协议使得事务具有较高的并发度,因为解锁不必发生在事务结尾。

它的不足是没有解决死锁的问题,因为它在加锁阶段没有顺序要求。如两个事务分别申请了A, B锁,接着又申请对方的锁,此时进入死锁状态。

pg应对死锁:查询执行中的sql、查询等待锁的事务、手动取消和回滚事务、

1
2
3
4
5
6
7
8
9
10
11
heap_update
block = ItemPointerGetBlockNumber(otid);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
oldtup.t_data = (HeapTupleHeader)PageGetItem(page, lp);
HeapSatisfiesHOTUpdate(relation, hot_attrs, key_attrs, id_attrs, &satisfies_hot,
&satisfies_key, &satisfies_id, &oldtup, newtup, page);
result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer, allow_update_self);

CheckForSerializableConflictIn(relation, &oldtup, buffer);
HeapTupleSetOid(newtup, HeapTupleGetOid(&oldtup));
heap_page_prepare_for_xid(relation, buffer, xid, false);

参考

https://blog.csdn.net/s_lisheng/article/details/139782641

https://blog.csdn.net/qq_37517281/article/details/104399535

https://blog.csdn.net/qq_52668274/article/details/128575448

作者

Desirer

发布于

2024-10-03

更新于

2024-11-15

许可协议