十年網(wǎng)站開發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專業(yè)推廣+無(wú)憂售后,網(wǎng)站問題一站解決
本篇內(nèi)容主要講解“PostgreSQL中插入數(shù)據(jù)時(shí)與WAL相關(guān)的處理邏輯是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“PostgreSQL中插入數(shù)據(jù)時(shí)與WAL相關(guān)的處理邏輯是什么”吧!

雁峰網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián),雁峰網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為雁峰上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站制作要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的雁峰做網(wǎng)站的公司定做!
靜態(tài)變量
進(jìn)程中全局共享
static int num_rdatas; /* entries currently used */ //已分配的空間大小 static int max_rdatas; /* allocated size */ //是否調(diào)用XLogBeginInsert函數(shù) static bool begininsert_called = false; static XLogCtlData *XLogCtl = NULL; /* flags for the in-progress insertion */ static uint8 curinsert_flags = 0; /* * ProcLastRecPtr points to the start of the last XLOG record inserted by the * current backend. It is updated for all inserts. XactLastRecEnd points to * end+1 of the last record, and is reset when we end a top-level transaction, * or start a new one; so it can be used to tell if the current transaction has * created any XLOG records. * ProcLastRecPtr指向當(dāng)前后端插入的最后一條XLOG記錄的開頭。 * 它針對(duì)所有插入進(jìn)行更新。 * XactLastRecEnd指向最后一條記錄的末尾位置 + 1, * 并在結(jié)束頂級(jí)事務(wù)或啟動(dòng)新事務(wù)時(shí)重置; * 因此,它可以用來(lái)判斷當(dāng)前事務(wù)是否創(chuàng)建了任何XLOG記錄。 * * While in parallel mode, this may not be fully up to date. When committing, * a transaction can assume this covers all xlog records written either by the * user backend or by any parallel worker which was present at any point during * the transaction. But when aborting, or when still in parallel mode, other * parallel backends may have written WAL records at later LSNs than the value * stored here. The parallel leader advances its own copy, when necessary, * in WaitForParallelWorkersToFinish. * 在并行模式下,這可能不是完全是最新的。 * 在提交時(shí),事務(wù)可以假定覆蓋了用戶后臺(tái)進(jìn)程或在事務(wù)期間出現(xiàn)的并行worker進(jìn)程的所有xlog記錄。 * 但是,當(dāng)中止時(shí),或者仍然處于并行模式時(shí),其他并行后臺(tái)進(jìn)程可能在較晚的LSNs中寫入了WAL記錄, * 而不是存儲(chǔ)在這里的值。 * 當(dāng)需要時(shí),并行處理進(jìn)程的leader在WaitForParallelWorkersToFinish中會(huì)推進(jìn)自己的副本。 */ XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; /* For WALInsertLockAcquire/Release functions */ //用于WALInsertLockAcquire/Release函數(shù) static int MyLockNo = 0; static bool holdingAllLocks = false;
宏定義
typedef char* Pointer;//指針 typedef Pointer Page;//Page #define XLOG_HEAP_INSERT 0x00 /* * Pointer to a location in the XLOG. These pointers are 64 bits wide, * because we don't want them ever to overflow. * 指向XLOG中的位置. * 這些指針大小為64bit,以確保指針不會(huì)溢出. */ typedef uint64 XLogRecPtr; /* * Additional macros for access to page headers. (Beware multiple evaluation * of the arguments!) */ #define PageGetLSN(page) \ PageXLogRecPtrGet(((PageHeader) (page))->pd_lsn) #define PageSetLSN(page, lsn) \ PageXLogRecPtrSet(((PageHeader) (page))->pd_lsn, lsn) /* Buffer size required to store a compressed version of backup block image */ //存儲(chǔ)壓縮會(huì)后的塊鏡像所需要的緩存空間大小 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ) //-------------------------------------------------- 鎖相關(guān) /* * Fake spinlock implementation using semaphores --- slow and prone * to fall foul of kernel limits on number of semaphores, so don't use this * unless you must! The subroutines appear in spin.c. * 使用信號(hào)量的偽自旋鎖實(shí)現(xiàn)——很慢而且容易與內(nèi)核對(duì)信號(hào)量的限制相沖突, * 所以除非必須,否則不要使用它!子例程出現(xiàn)在spin.c中。 */ typedef int slock_t; typedef uint32 pg_crc32c; #define SpinLockInit(lock) S_INIT_LOCK(lock) #define SpinLockAcquire(lock) S_LOCK(lock) #define SpinLockRelease(lock) S_UNLOCK(lock) #define SpinLockFree(lock) S_LOCK_FREE(lock) #define XLogSegmentOffset(xlogptr, wal_segsz_bytes) \ ((xlogptr) & ((wal_segsz_bytes) - 1)) #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30) #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29) #define LW_FLAG_LOCKED ((uint32) 1 << 28) #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24) #define LW_VAL_SHARED 1 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1)) /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */ #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
LWLock
lwlock.c外的代碼不應(yīng)直接操作這個(gè)結(jié)構(gòu)的內(nèi)容,但我們必須聲明該結(jié)構(gòu)體以便將LWLocks合并到其他數(shù)據(jù)結(jié)構(gòu)中。
/*
* Code outside of lwlock.c should not manipulate the contents of this
* structure directly, but we have to declare it here to allow LWLocks to be
* incorporated into other data structures.
* lwlock.c外的代碼不應(yīng)直接操作這個(gè)結(jié)構(gòu)的內(nèi)容,
* 但我們必須聲明該結(jié)構(gòu)體以便將LWLocks合并到其他數(shù)據(jù)結(jié)構(gòu)中。
*/
typedef struct LWLock
{
uint16 tranche; /* tranche ID */
//獨(dú)占/非獨(dú)占locker的狀態(tài)
pg_atomic_uint32 state; /* state of exclusive/nonexclusive lockers */
//正在等待的PGPROCs鏈表
proclist_head waiters; /* list of waiting PGPROCs */
#ifdef LOCK_DEBUG//用于DEBUG
//waiters的數(shù)量
pg_atomic_uint32 nwaiters; /* number of waiters */
//鎖的最后獨(dú)占者
struct PGPROC *owner; /* last exclusive owner of the lock */
#endif
} LWLock;heap_insert
主要實(shí)現(xiàn)邏輯是插入元組到堆中,其中存在對(duì)WAL(XLog)進(jìn)行處理的部分.
參見PostgreSQL 源碼解讀(104)- WAL#1(Insert & WAL-heap_insert函數(shù)#1)
XLogInsert/XLogInsertRecord
插入一個(gè)具有指定的RMID和info字節(jié)的XLOG記錄,該記錄的主體是先前通過XLogRegister*調(diào)用注冊(cè)的數(shù)據(jù)和緩沖區(qū)引用。
參見PostgreSQL 源碼解讀(106)- WAL#3(Insert & WAL-heap_insert函數(shù)#3)
WALInsertLockXXX
包括WALInsertLockAcquireExclusive、WALInsertLockAcquire和WALInsertLockRelease等
//----------------------------------------------------------- WALInsertLockAcquireExclusive
/*
* Acquire all WAL insertion locks, to prevent other backends from inserting
* to WAL.
* 請(qǐng)求所有的WAL insertion鎖,以避免其他后臺(tái)進(jìn)程插入數(shù)據(jù)到WAL中
*/
static void
WALInsertLockAcquireExclusive(void)
{
int i;
/*
* When holding all the locks, all but the last lock's insertingAt
* indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real
* XLogRecPtr value, to make sure that no-one blocks waiting on those.
* 在持有所有的locks時(shí),除了最后一個(gè)鎖的insertingAt指示器外,
* 其余均設(shè)置為0xFFFFFFFFFFFFFFFF,
* 該值比所有實(shí)際的XLogRecPtr都要大,以確保沒有阻塞這些鎖。.
*/
for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)//NUM_XLOGINSERT_LOCKS
{
LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
LWLockUpdateVar(&WALInsertLocks[i].l.lock,
&WALInsertLocks[i].l.insertingAt,
PG_UINT64_MAX);
}
/* Variable value reset to 0 at release */
//在釋放時(shí),變量值重置為0
LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
//設(shè)置標(biāo)記
holdingAllLocks = true;
}
/*
* LWLockAcquire - acquire a lightweight lock in the specified mode
* LWLockAcquire - 申請(qǐng)指定模式的輕量級(jí)鎖
*
* If the lock is not available, sleep until it is. Returns true if the lock
* was available immediately, false if we had to sleep.
* 如果鎖不可用,休眠直至可用.
* 如果鎖馬上可用則返回T,需要休眠則返回F
*
* Side effect: cancel/die interrupts are held off until lock release.
* 副作用:在鎖釋放的時(shí)候才能允許中斷/終止
*/
bool
LWLockAcquire(LWLock *lock, LWLockMode mode)
{
PGPROC *proc = MyProc;//PGPROC數(shù)據(jù)結(jié)構(gòu)
bool result = true;
int extraWaits = 0;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);//獲得鎖的統(tǒng)計(jì)入口
#endif
//模式驗(yàn)證
AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
PRINT_LWDEBUG("LWLockAcquire", lock, mode);
#ifdef LWLOCK_STATS
/* Count lock acquisition attempts */
if (mode == LW_EXCLUSIVE)
lwstats->ex_acquire_count++;
else
lwstats->sh_acquire_count++;
#endif /* LWLOCK_STATS */
/*
* We can't wait if we haven't got a PGPROC. This should only occur
* during bootstrap or shared memory initialization. Put an Assert here
* to catch unsafe coding practices.
* 如果還沒有得到PGPROC則不能等待.
* 這種情況可能出現(xiàn)在bootstrap或者共享內(nèi)存初始化時(shí).
* 在這里加入Assert代碼以確保捕獲不安全的編碼實(shí)踐.
*/
Assert(!(proc == NULL && IsUnderPostmaster));
/* Ensure we will have room to remember the lock */
//確保我們有足夠的地方存儲(chǔ)鎖
if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
elog(ERROR, "too many LWLocks taken");
/*
* Lock out cancel/die interrupts until we exit the code section protected
* by the LWLock. This ensures that interrupts will not interfere with
* manipulations of data structures in shared memory.
* 退出使用LWLock鎖保護(hù)的實(shí)現(xiàn)邏輯時(shí)才能允許取消或者中斷.
* 這樣可以確保中斷不會(huì)與共享內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)管理邏輯發(fā)現(xiàn)關(guān)系.
*/
HOLD_INTERRUPTS();
/*
* Loop here to try to acquire lock after each time we are signaled by
* LWLockRelease.
* 循環(huán),在每次LWLockRelease信號(hào)產(chǎn)生時(shí)獲取鎖.
*
* NOTE: it might seem better to have LWLockRelease actually grant us the
* lock, rather than retrying and possibly having to go back to sleep. But
* in practice that is no good because it means a process swap for every
* lock acquisition when two or more processes are contending for the same
* lock. Since LWLocks are normally used to protect not-very-long
* sections of computation, a process needs to be able to acquire and
* release the same lock many times during a single CPU time slice, even
* in the presence of contention. The efficiency of being able to do that
* outweighs the inefficiency of sometimes wasting a process dispatch
* cycle because the lock is not free when a released waiter finally gets
* to run. See pgsql-hackers archives for 29-Dec-01.
* 注意:看起來(lái)相對(duì)于不斷的重入和休眠而言LWLockRelease的實(shí)際持有者授予我們鎖會(huì)更好,
* 但在工程實(shí)踐上來(lái)看,
* 這樣的做法并不好因?yàn)檫@意味著當(dāng)兩個(gè)或多個(gè)進(jìn)程爭(zhēng)用同一個(gè)鎖時(shí)對(duì)每個(gè)鎖都會(huì)出現(xiàn)進(jìn)程交換.
* 由于LWLocks通常來(lái)說用于保護(hù)并不是太長(zhǎng)時(shí)間的計(jì)算邏輯,
* 甚至在出現(xiàn)爭(zhēng)用的時(shí)候,一個(gè)進(jìn)程需要能夠在一個(gè)CPU時(shí)間片期間獲取和釋放同樣的鎖很多次.
* 那樣子做的收獲會(huì)導(dǎo)致有時(shí)候進(jìn)程調(diào)度的低效,
* 因?yàn)楫?dāng)一個(gè)已釋放的進(jìn)程終于可以運(yùn)行時(shí),鎖卻沒有獲取.
*/
for (;;)
{
bool mustwait;
/*
* Try to grab the lock the first time, we're not in the waitqueue
* yet/anymore.
* 第一次試著獲取鎖,我們已經(jīng)不在等待隊(duì)列中了。
*/
mustwait = LWLockAttemptLock(lock, mode);
if (!mustwait)
{
LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
break; /* 成功!got the lock */
}
/*
* Ok, at this point we couldn't grab the lock on the first try. We
* cannot simply queue ourselves to the end of the list and wait to be
* woken up because by now the lock could long have been released.
* Instead add us to the queue and try to grab the lock again. If we
* succeed we need to revert the queuing and be happy, otherwise we
* recheck the lock. If we still couldn't grab it, we know that the
* other locker will see our queue entries when releasing since they
* existed before we checked for the lock.
* 在這個(gè)點(diǎn),我們不能在第一次就獲取鎖.
* 我們不能在鏈表的末尾進(jìn)行簡(jiǎn)單的排隊(duì)然后等待喚醒,因?yàn)殒i可能已經(jīng)釋放很長(zhǎng)時(shí)間了.
* 相反,我們需要重新加入到隊(duì)列中再次嘗試獲取鎖.
* 如果成功了,我們需要翻轉(zhuǎn)隊(duì)列,否則的話需要重新檢查鎖.
* 如果還是不能獲取鎖,我們知道其他locker在釋放時(shí)可以看到我們的隊(duì)列入口,
* 因?yàn)樵谖覀儥z查鎖時(shí)它們已經(jīng)存在了.
*/
/* add to the queue */
//添加到隊(duì)列中
LWLockQueueSelf(lock, mode);
/* we're now guaranteed to be woken up if necessary */
//在需要的時(shí)候,確??梢员粏拘?
mustwait = LWLockAttemptLock(lock, mode);
/* ok, grabbed the lock the second time round, need to undo queueing */
//第二次嘗試獲取鎖,需要取消排隊(duì)
if (!mustwait)
{
LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
LWLockDequeueSelf(lock);//出列
break;
}
/*
* Wait until awakened.
* 等待直至被喚醒
*
* Since we share the process wait semaphore with the regular lock
* manager and ProcWaitForSignal, and we may need to acquire an LWLock
* while one of those is pending, it is possible that we get awakened
* for a reason other than being signaled by LWLockRelease. If so,
* loop back and wait again. Once we've gotten the LWLock,
* re-increment the sema by the number of additional signals received,
* so that the lock manager or signal manager will see the received
* signal when it next waits.
* 由于我們使用常規(guī)的鎖管理和ProcWaitForSignal信號(hào)共享進(jìn)程等待信號(hào)量,
* 我們可能需要在其中一個(gè)掛起時(shí)獲取LWLock,
* 原因是有可能是由于其他的原因而不是通過LWLockRelease信號(hào)被喚醒.
* 如果是這種情況,則繼續(xù)循環(huán)等待.
* 一旦我們獲得LWLock,根據(jù)接收到的額外信號(hào)數(shù)目,再次增加信號(hào)量,
* 以便鎖管理器或者信號(hào)管理器在下次等待時(shí)可以看到已接收的信號(hào).
*/
LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
#ifdef LWLOCK_STATS
lwstats->block_count++;//統(tǒng)計(jì)
#endif
LWLockReportWaitStart(lock);//報(bào)告等待
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
for (;;)
{
PGSemaphoreLock(proc->sem);
if (!proc->lwWaiting)//如果不是LWLock等待,跳出循環(huán)
break;
extraWaits++;//額外的等待
}
/* Retrying, allow LWLockRelease to release waiters again. */
//重試,允許LWLockRelease再次釋放waiters
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
#ifdef LOCK_DEBUG
{
/* not waiting anymore */
//無(wú)需等待
uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
Assert(nwaiters < MAX_BACKENDS);
}
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
LWLockReportWaitEnd();
LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
//再次循環(huán)以再次請(qǐng)求鎖
/* Now loop back and try to acquire lock again. */
result = false;
}
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
//獲取成功!
/* Add lock to list of locks held by this backend */
//在該后臺(tái)進(jìn)程持有的鎖鏈表中添加鎖
held_lwlocks[num_held_lwlocks].lock = lock;
held_lwlocks[num_held_lwlocks++].mode = mode;
/*
* Fix the process wait semaphore's count for any absorbed wakeups.
* 修正進(jìn)程咋等待信號(hào)量計(jì)數(shù)的其他absorbed喚醒。
*/
while (extraWaits-- > 0)
PGSemaphoreUnlock(proc->sem);
return result;
}
/*
* Internal function that tries to atomically acquire the lwlock in the passed
* in mode.
* 嘗試使用指定模式原子性獲取LWLock鎖的內(nèi)部函數(shù).
*
* This function will not block waiting for a lock to become free - that's the
* callers job.
* 該函數(shù)不會(huì)阻塞等待鎖釋放的進(jìn)程 -- 這是調(diào)用者的工作.
*
* Returns true if the lock isn't free and we need to wait.
* 如果鎖仍未釋放,仍需要等待,則返回T
*/
static bool
LWLockAttemptLock(LWLock *lock, LWLockMode mode)
{
uint32 old_state;
AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
/*
* Read once outside the loop, later iterations will get the newer value
* via compare & exchange.
* 在循環(huán)外先讀取一次,后續(xù)可以通過比較和交換獲得較新的值
*/
old_state = pg_atomic_read_u32(&lock->state);
/* loop until we've determined whether we could acquire the lock or not */
//循環(huán)指針我們確定是否可以獲得鎖位置
while (true)
{
uint32 desired_state;
bool lock_free;
desired_state = old_state;
if (mode == LW_EXCLUSIVE)//獨(dú)占
{
lock_free = (old_state & LW_LOCK_MASK) == 0;
if (lock_free)
desired_state += LW_VAL_EXCLUSIVE;
}
else
{
//非獨(dú)占
lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
if (lock_free)
desired_state += LW_VAL_SHARED;
}
/*
* Attempt to swap in the state we are expecting. If we didn't see
* lock to be free, that's just the old value. If we saw it as free,
* we'll attempt to mark it acquired. The reason that we always swap
* in the value is that this doubles as a memory barrier. We could try
* to be smarter and only swap in values if we saw the lock as free,
* but benchmark haven't shown it as beneficial so far.
* 嘗試在我們期望的狀態(tài)下進(jìn)行交換。
* 如果沒有看到鎖被釋放,那么這回是舊的值.
* 如果鎖已釋放,嘗試標(biāo)記鎖已被獲取.
* 我們通常交換值的理由是會(huì)使用雙倍的內(nèi)存barrier.
* 我們嘗試變得更好:只交換我們看到已釋放的鎖,但壓力測(cè)試顯示并沒有什么性能改善.
*
* Retry if the value changed since we last looked at it.
* 在最后一次查找后如果值改變,則重試
*/
if (pg_atomic_compare_exchange_u32(&lock->state,
&old_state, desired_state))
{
if (lock_free)
{
/* Great! Got the lock. */
//很好,獲取鎖!
#ifdef LOCK_DEBUG
if (mode == LW_EXCLUSIVE)
lock->owner = MyProc;
#endif
return false;
}
else
return true; /* 某人還持有鎖.somebody else has the lock */
}
}
pg_unreachable();//正常來(lái)說,程序邏輯不應(yīng)到這里
}
//----------------------------------------------------------- WALInsertLockAcquire
/*
* Acquire a WAL insertion lock, for inserting to WAL.
* 在寫入WAL前獲取wAL insertion鎖
*/
static void
WALInsertLockAcquire(void)
{
bool immed;
/*
* It doesn't matter which of the WAL insertion locks we acquire, so try
* the one we used last time. If the system isn't particularly busy, it's
* a good bet that it's still available, and it's good to have some
* affinity to a particular lock so that you don't unnecessarily bounce
* cache lines between processes when there's no contention.
* 我們請(qǐng)求獲取哪個(gè)WAL insertion鎖無(wú)關(guān)緊要,因此獲取最后使用的那個(gè).
* 如果系統(tǒng)并不繁忙,那么運(yùn)氣好的話,仍然可用,
* 與特定的鎖保持一定的親緣關(guān)系是很好的,這樣在沒有爭(zhēng)用的情況下,
* 就可用避免不必要地在進(jìn)程之間切換緩存line。
*
* If this is the first time through in this backend, pick a lock
* (semi-)randomly. This allows the locks to be used evenly if you have a
* lot of very short connections.
* 如果這是該進(jìn)程的第一次獲取,隨機(jī)獲取一個(gè)鎖.
* 如果有很多非常短的連接的情況下,這樣可以均勻地使用鎖。
*/
static int lockToTry = -1;
if (lockToTry == -1)
lockToTry = MyProc->pgprocno % NUM_XLOGINSERT_LOCKS;
MyLockNo = lockToTry;
/*
* The insertingAt value is initially set to 0, as we don't know our
* insert location yet.
* insertingAt值初始化為0,因?yàn)槲覀冞€不知道我們插入的位置.
*/
immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
if (!immed)
{
/*
* If we couldn't get the lock immediately, try another lock next
* time. On a system with more insertion locks than concurrent
* inserters, this causes all the inserters to eventually migrate to a
* lock that no-one else is using. On a system with more inserters
* than locks, it still helps to distribute the inserters evenly
* across the locks.
* 如果不能馬上獲得鎖,下回嘗試另外一個(gè)鎖.
* 在一個(gè)insertion鎖比并發(fā)插入者更多的系統(tǒng)中,
* 這會(huì)導(dǎo)致所有的inserters周期性的遷移到?jīng)]有使用的鎖上面
* 相反,仍然可以有助于周期性的分發(fā)插入者到不同的鎖上.
*/
lockToTry = (lockToTry + 1) % NUM_XLOGINSERT_LOCKS;
}
}
//----------------------------------------------------------- WALInsertLockRelease
/*
* Release our insertion lock (or locks, if we're holding them all).
* 釋放insertion鎖
*
* NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the
* next time the lock is acquired.
* 注意:重置所有的變量為0,這樣它們可以使LWLockWaitForVar在下一次獲取鎖時(shí)阻塞.
*/
static void
WALInsertLockRelease(void)
{
if (holdingAllLocks)//如持有所有鎖
{
int i;
for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
&WALInsertLocks[i].l.insertingAt,
0);
holdingAllLocks = false;
}
else
{
LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
&WALInsertLocks[MyLockNo].l.insertingAt,
0);
}
}
/*
* LWLockReleaseClearVar - release a previously acquired lock, reset variable
* LWLockReleaseClearVar - 釋放先前獲取的鎖并重置變量
*/
void
LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
{
LWLockWaitListLock(lock);
/*
* Set the variable's value before releasing the lock, that prevents race
* a race condition wherein a new locker acquires the lock, but hasn't yet
* set the variables value.
* 在釋放鎖之前設(shè)置變量的值,這可以防止一個(gè)新的locker在沒有設(shè)置變量值的情況下獲取鎖時(shí)的爭(zhēng)用.
*/
*valptr = val;
LWLockWaitListUnlock(lock);
LWLockRelease(lock);
}
/*
* Lock the LWLock's wait list against concurrent activity.
* 鎖定針對(duì)并發(fā)活動(dòng)的LWLock等待鏈表
*
* NB: even though the wait list is locked, non-conflicting lock operations
* may still happen concurrently.
* 注意:雖然等待鏈表被鎖定,非沖突鎖操作仍然可能會(huì)并發(fā)出現(xiàn)
*
* Time spent holding mutex should be short!
* 耗費(fèi)在持有mutex的時(shí)間應(yīng)該盡可能的短
*/
static void
LWLockWaitListLock(LWLock *lock)
{
uint32 old_state;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
uint32 delays = 0;
lwstats = get_lwlock_stats_entry(lock);
#endif
while (true)
{
/* always try once to acquire lock directly */
//首次嘗試直接獲取鎖
old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
if (!(old_state & LW_FLAG_LOCKED))
break; /* 成功獲取;got lock */
/* and then spin without atomic operations until lock is released */
//然后在沒有原子操作的情況下spin,直到鎖釋放
{
SpinDelayStatus delayStatus;//SpinDelay狀態(tài)
init_local_spin_delay(&delayStatus);//初始化
while (old_state & LW_FLAG_LOCKED)//獲取Lock
{
perform_spin_delay(&delayStatus);
old_state = pg_atomic_read_u32(&lock->state);
}
#ifdef LWLOCK_STATS
delays += delayStatus.delays;
#endif
finish_spin_delay(&delayStatus);
}
/*
* Retry. The lock might obviously already be re-acquired by the time
* we're attempting to get it again.
* 重試,鎖有可能在嘗試在此獲取時(shí)已通過重新請(qǐng)求而獲得.
*/
}
#ifdef LWLOCK_STATS
lwstats->spin_delay_count += delays;//延遲計(jì)數(shù)
#endif
}
/*
* Unlock the LWLock's wait list.
* 解鎖LWLock的等待鏈表
*
* Note that it can be more efficient to manipulate flags and release the
* locks in a single atomic operation.
* 注意,在單個(gè)原子操作中操作標(biāo)志和釋放鎖可能更有效。
*/
static void
LWLockWaitListUnlock(LWLock *lock)
{
uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
Assert(old_state & LW_FLAG_LOCKED);
}
/*
* LWLockRelease - release a previously acquired lock
* LWLockRelease - 釋放先前獲取的鎖
*/
void
LWLockRelease(LWLock *lock)
{
LWLockMode mode;
uint32 oldstate;
bool check_waiters;
int i;
/*
* Remove lock from list of locks held. Usually, but not always, it will
* be the latest-acquired lock; so search array backwards.
* 從持有的鎖鏈表中清除鎖.
* 通常來(lái)說(但不是總是如此),清除的是最后請(qǐng)求的鎖,因此從后往前搜索數(shù)組.
*/
for (i = num_held_lwlocks; --i >= 0;)
if (lock == held_lwlocks[i].lock)
break;
if (i < 0)
elog(ERROR, "lock %s is not held", T_NAME(lock));
mode = held_lwlocks[i].mode;//模式
num_held_lwlocks--;//減一
for (; i < num_held_lwlocks; i++)
held_lwlocks[i] = held_lwlocks[i + 1];
PRINT_LWDEBUG("LWLockRelease", lock, mode);
/*
* Release my hold on lock, after that it can immediately be acquired by
* others, even if we still have to wakeup other waiters.
* 釋放"我"持有的鎖,
*/
if (mode == LW_EXCLUSIVE)
oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
else
oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
/* nobody else can have that kind of lock */
//舍我其誰(shuí)!
Assert(!(oldstate & LW_VAL_EXCLUSIVE));
/*
* We're still waiting for backends to get scheduled, don't wake them up
* again.
* 仍然在等待后臺(tái)進(jìn)程獲得調(diào)度,暫時(shí)不需要再次喚醒它們
*/
if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
(oldstate & LW_LOCK_MASK) == 0)
check_waiters = true;
else
check_waiters = false;
/*
* As waking up waiters requires the spinlock to be acquired, only do so
* if necessary.
* 因?yàn)閱拘训却餍枰@取spinlock,所以只有在必要時(shí)才這樣做。
*/
if (check_waiters)
{
/* XXX: remove before commit? */
//XXX: 在commit前清除?
LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
LWLockWakeup(lock);
}
TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
/*
* Now okay to allow cancel/die interrupts.
* 現(xiàn)在可以允許中斷操作了.
*/
RESUME_INTERRUPTS();
}到此,相信大家對(duì)“PostgreSQL中插入數(shù)據(jù)時(shí)與WAL相關(guān)的處理邏輯是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!