作者簡介:
程磊,一線碼農,在某手機公司擔任系統開發工程師,日常喜歡研究內核基本原理。
目錄
一、自旋鎖的發展歷史
二、原始自旋鎖
2.1 定義與初始化 2.2 加鎖操作 2.3 解鎖操作三、票號自旋鎖
3.1 定義與初始化 3.2 加鎖操作 3.3 解鎖操作四、MCS自旋鎖
4.1 定義與初始化 4.2 加鎖操作 4.3 解鎖操作五、隊列自旋鎖
5.1 定義與初始化 5.2 加鎖操作 5.3 解鎖操作六、自旋鎖的使用
6.1 自旋鎖的適用場景 6.2 自旋鎖與禁用偽并發的配合使用 6.3 raw_spin_lock的使用問題
一、自旋鎖的發展歷史
自旋鎖是Linux內核里最常用的鎖之一,自旋鎖的概念很簡單,就是如果加鎖失敗在等鎖時是使用休眠等待還是忙等待,如果是忙等待的話,就是自旋鎖,這也是自旋鎖名字的由來。自旋鎖的邏輯是,用自旋鎖保護的臨界區要足夠小,而且臨界區內是不能休眠的。所以當自旋鎖加鎖失敗時,說明有其它的臨界區正在執行中。由于自旋鎖的臨界區足夠小且不會休眠,所以我們可以自旋忙等待其它臨界區的退出,沒必要去休眠,因為休眠要做一大堆操作。而忙等待的話,對方很快就會退出臨界區,我們就可以很快地獲得自旋鎖了。
自旋鎖與UP、SMP的關系:
根據自旋鎖的邏輯,自旋鎖的臨界區是不能休眠的。在UP下,只有一個CPU,如果我們執行到了臨界區,此時自旋鎖是不可能處于加鎖狀態的。因為我們正在占用CPU,又沒有其它的CPU,其它的臨界區要么沒有到來、要么已經執行過去了。所以我們是一定能獲得自旋鎖的,所以自旋鎖對UP來說是沒有意義的。但是為了在UP和SMP下代碼的一致性,UP下也有自旋鎖,但是自旋鎖的定義就變成了空結構體,自旋鎖的加鎖操作就退化成了禁用搶占,自旋鎖的解鎖操作也就退化成了開啟搶占。所以說自旋鎖只適用于SMP,但是在UP下也提供了兼容操作。
自旋鎖一開始的實現是很簡單的,后來隨著眾核時代的到來,自旋鎖的公平性成了很大的問題,于是內核實現了票號自旋鎖(ticket spinlock)來保證加鎖的公平性。后來又發現票號自旋鎖有很大的性能問題,于是又開始著力解決自旋鎖的性能問題。先是開發出了MCS自旋鎖,確實解決了性能問題,但是它改變了自旋鎖的接口,所以沒辦法替代自旋鎖。然后又有人對MCS自旋鎖進行改造從而開發出了隊列自旋鎖(queue spinlock)。隊列自旋鎖既解決了自旋鎖的性能問題,又保持了自旋鎖的原有接口,非常完美。現在內核使用的自旋鎖是隊列自旋鎖。下面我們用一張圖來總結一下自旋鎖的發展史(x86平臺)。
注:MCS自旋鎖進了內核,但是由于接口不兼容和體積問題,并沒有取代票號自旋鎖。
下面我們將按照自旋鎖的發展順序來逐步講解,本文的代碼都是按照x86平臺進行講解的,代碼都進行了刪減,把一些調試數據或者無關緊要的數據、代碼都刪除了。
二、原始自旋鎖
我們在《深入理解Linux線程同步》里面講了簡單自旋鎖,原始自旋鎖和它的原理是一樣的,但是實現細節更為復雜一些。本節以內核版本2.6.24來講解代碼。
2.1 定義與初始化
我們先來看原始自旋鎖的定義。
linux-src/include/linux/spinlock_types.h
typedef struct {
raw_spinlock_t raw_lock;
} spinlock_t;
做了刪減,把調試相關的配置數據都刪了。
linux-src/include/asm-x86/spinlock_types.h
typedef struct {
unsigned int slock;
} raw_spinlock_t;
我們可以看到原始自旋鎖的定義非常簡單,本質上就是一個無符號整數。
下面我們再來看一下自旋鎖變量的定義與初始化。自旋鎖的定義與初始化分為靜態和動態兩種。靜態是指自旋鎖在編譯時就分配好了空間、數據就初始化好了,這種情況一般是全局自旋鎖變量。動態是指自旋鎖是在運行時去創建然后用函數去初始化的,這種情況一般是自旋鎖內嵌在某個結構體里面,隨著結構體的創建而創建,需要用函數去初始化一下。
靜態定義與初始化如下:
linux-src/include/linux/spinlock_types.h
#define DEFINE_SPINLOCK(x) spinlock_t x = __SPIN_LOCK_UNLOCKED(x)
#define __SPIN_LOCK_UNLOCKED(lockname)
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED}
linux-src/include/asm-x86/spinlock_types.h
#define __RAW_SPIN_LOCK_UNLOCKED { 1 }
自旋鎖的動態初始化如下:
linux-src/include/linux/spinlock.h
# define spin_lock_init(lock)
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
此處的do while(0)是為了能把宏當做函數一樣來用。我們調用函數時最后面都要加個;分號,如果沒有do while(0),我們在最后加個;分號,語法就不對了,如果不加,宏看上去就不像是函數調用了。有了do while(0),這個問題就解決了。
靜態初始化是在編譯時就對變量賦值了,動態初始化是在運行時才對變量進行賦值。
2.2 加鎖操作
下面我們來看一下自旋鎖的加鎖操作:
linux-src/include/linux/spinlock.h
#define spin_lock(lock) _spin_lock(lock)
linux-src/kernel/spinlock.c
void __lockfunc _spin_lock(spinlock_t *lock)
{
preempt_disable();
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
}
linux-src/include/linux/lockdep.h
#define LOCK_CONTENDED(_lock, try, lock)
lock(_lock)
linux-src/include/linux/spinlock.h
# define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)
linux-src/include/asm-x86/spinlock_32.h
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile(" 1: "
LOCK_PREFIX " ; decb %0 "
"jns 3f "
"2: "
"rep;nop "
"cmpb $0,%0 "
"jle 2b "
"jmp 1b "
"3: "
: "+m" (lock->slock) : : "memory");
}
linux-src/include/asm-x86/spinlock_64.h
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile(
" 1: "
LOCK_PREFIX " ; decl %0 "
"jns 2f "
"3: "
"rep;nop "
"cmpl $0,%0 "
"jle 3b "
"jmp 1b "
"2: " : "=m" (lock->slock) : : "memory");
}
可以看到spin_lock的最終實現是__raw_spin_lock,是在架構代碼里面,在x86上分為32位和64位兩種實現,用的都是內嵌匯編代碼。關于內嵌匯編,可以查詢gcc的官方文檔GCC內嵌匯編語言。
這兩段內嵌匯編代碼的意思都是一樣的,但是比較晦澀難懂,我們把它轉換為C代碼。
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
while(1){
if(--lock->slock == 0) // 匯編代碼中有lock指令前綴,此操作是原子的
return;
while((int)lock->slock <= 0){}
}
}
轉換成C代碼之后就很好理解了。原始自旋鎖用1來表示沒有加鎖,在無限循環中,我們首先把鎖變量原子地減1并比較是否等于0,如果等于0,說明剛才鎖變量是1,現在變成了0,我們加鎖成功了,直接返回。如果鎖變量不等于0,就是說鎖變量剛才是0,現在變成負的了,那么我們就無限循環鎖變量小于等于0,直到鎖變量大于0,也就是等于1,結束此循環,重新回到大循環中去嘗試加鎖。為什么要把鎖變量強轉為int呢,因為鎖變量的定義是無符號數,而在匯編代碼中把它當做有符號數使用,所以加個int強轉。為什么內循環是小于等于0而不是小于0呢,首先剛到達內循環的時候,說明我們搶鎖失敗,鎖變量一定是小于0的,在內循環執行的過程中,如果有人釋放了鎖而又有人立馬搶到了鎖,此時鎖變量還是0,此時我們結束內循環去搶鎖是沒有意義的,搶鎖肯定會失敗還會回到內循環。所以只有當鎖變量大于0也就是等于1時,代表鎖是空閑的,此時結束內循環才是有意義的。
2.3 解鎖操作
下面我們看一下解鎖操作:
linux-src/include/linux/spinlock.h
#define spin_unlock(lock) _spin_unlock(lock)
linux-src/kernel/spinlock.c
void __lockfunc _spin_unlock(spinlock_t *lock)
{
_raw_spin_unlock(lock);
preempt_enable();
}
linux-src/include/linux/spinlock.h
# define _raw_spin_unlock(lock) __raw_spin_unlock(&(lock)->raw_lock)
linux-src/include/asm-x86/spinlock_32.h
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}
linux-src/include/asm-x86/spinlock_64.h
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
asm volatile("movl $1,%0" :"=m" (lock->slock) :: "memory");
}
可以看到解鎖操作也是在架構代碼里面實現的,用的也是內嵌匯編代碼,這個代碼比較簡單,就是把鎖變量賦值為1,我們就不再轉換成C代碼了。
三、票號自旋鎖
看了上面的原始自旋鎖實現之后,我們發現自旋鎖并沒有排隊機制,如果有很多人在競爭鎖的情況下,誰能獲得鎖是不確定的。在CPU核數還比較少的時候,這個問題并不突出,內核也沒有去解決這個問題。后來隨著CPU核數越來越多,內核越來越復雜、鎖競爭越來越激烈,自旋鎖的不公平性問題就越來越突出了。有人做了個實驗,在一個8核CPU的電腦上,有的線程竟然連續100萬次都沒有獲得自旋鎖。這個不公平性就太嚴重了,解決自旋鎖的公平性問題就迫在眉睫了。
為了解決自旋鎖的公平性問題,內核開發了票號自旋鎖。它的原理就類似于我們去銀行辦業務,以前沒有叫號機,我們每個人都盯著業務窗口看,發現一個人走了立馬一窩蜂地擠過去,誰搶到了位置就輪到誰辦業務。在人特別多的時候,有的人可能早上十點來的,下午五點都沒搶到機會。這怎么能行呢,太不公平了,于是銀行買了叫號機,每個進來的人都先取一個號,然后坐著等。業務員處理完一個人的業務之后就播報下一個要處理的票號。每個人都要一直注意著廣播播報,發現廣播里叫的號和自己手里的號是一樣的,就輪到自己去辦業務了。
票號自旋鎖在實現時把原來的自旋鎖的整形變量拆分成了兩部分,一部分是owner,代表當前正在辦業務的票號,另一部分是next,代表下一個人取號的號碼。每次加鎖時先取號,定義一個局部變量int ticket = next++,自己取的號是next的值,再把next的值加1,然后不停地比較自己的ticket和owner的值,如果不相等就一直比較,直到相等為止,相等代表加鎖成功,該自己去辦業務了。辦業務就相當于臨界區,辦完業務離開臨界區,解鎖自旋鎖就是把owner加1。此時如果有其它人在自旋,他發現owner加1之后和自己的ticket相等了,就結束自旋,代表他加鎖成功了。我們來畫一個圖來演示一下:
票號自旋鎖的狀態和原始自旋鎖有很大不同,原始自旋鎖是1代表未加鎖,0代表已加鎖,看不出來排隊等鎖的線程有多少個。票號自旋鎖,owner和next相等代表未加鎖,兩者不一定等于0,next和owner的差值等于排隊等鎖的線程個數。
下面我們以內核版本4.1來講解。在x86的實現上,owner叫head,next叫tail,其實這么叫也很合理,從head到tail正好是所有加鎖的人構成的一個隊列,head是隊首,已經獲得了鎖,tail是隊尾,是下一個來的人的序號。
3.1 定義與初始化
我們先來看票號自旋鎖的定義:
linux-src/include/linux/spinlock_types.h
typedef struct spinlock {
struct raw_spinlock rlock;
} spinlock_t;
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
} raw_spinlock_t;
linux-src/arch/x86/include/asm/spinlock_types.h
typedef struct arch_spinlock {
union {
__ticketpair_t head_tail;
struct __raw_tickets {
__ticket_t head, tail;
} tickets;
};
} arch_spinlock_t;
#if (CONFIG_NR_CPUS < (256 / __TICKET_LOCK_INC))
typedef u8 __ticket_t;
typedef u16 __ticketpair_t;
#else
typedef u16 __ticket_t;
typedef u32 __ticketpair_t;
#endif
這里同樣把一些調試數據代碼進行了刪除。spinlock包含raw_spinlock,raw_spinlock包含arch_spinlock_t,之前只有兩層,spinlock是對外接口,raw_spinlock是各個架構的實現,現在為什么又多了個arch_spinlock_t呢,原因和RTLinux有關。為了配合RTLinux的實現,Linus決定把原來的raw_spinlock改為arch_spinlock,把原來的spinlock改為raw_spinlock,再實現一個新的spinlock來包含raw_spinlock。這樣的話,arch_spinlock就是各個架構的實現,spinlock和raw_spinlock在標準Linux下的含義沒有區別,在RTLinux下含義不同,具體請看6.3節的講解。
arch_spinlock中使用了共用體,既可以把head tail當成一個變量來處理,又可以把它們分開當成兩個變量來處理。
下面我們來看一下票號自旋鎖的初始化。
靜態初始化如下:
linux-src/include/linux/spinlock_types.h
#define DEFINE_SPINLOCK(x) spinlock_t x = __SPIN_LOCK_UNLOCKED(x)
#define __SPIN_LOCK_UNLOCKED(lockname)
(spinlock_t ) __SPIN_LOCK_INITIALIZER(lockname)
#define __SPIN_LOCK_INITIALIZER(lockname)
{ { .rlock = __RAW_SPIN_LOCK_INITIALIZER(lockname) } }
#define __RAW_SPIN_LOCK_INITIALIZER(lockname)
{ .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED }
linux-src/arch/x86/include/asm/spinlock_types.h
#define __ARCH_SPIN_LOCK_UNLOCKED { { 0 } }
動態初始化如下:
linux-src/include/linux/spinlock.h
#define spin_lock_init(_lock)
do {
raw_spin_lock_init(&(_lock)->rlock);
} while (0)
# define raw_spin_lock_init(lock)
do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)
linux-src/include/linux/spinlock_types.h
#define __RAW_SPIN_LOCK_UNLOCKED(lockname)
(raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)
#define __RAW_SPIN_LOCK_INITIALIZER(lockname)
{ .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED }
靜態初始化和動態初始化都把票號自旋鎖初始化為{head:0,tail:0}狀態,head和tail相同表明鎖當前是未加鎖狀態。
3.2 加鎖操作
下面我們來看一下票號自旋鎖的加鎖操作:
linux-src/include/linux/spinlock.h
static inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
#define raw_spin_lock(lock) _raw_spin_lock(lock)
linux-src/kernel/locking/spinlock.c
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
__raw_spin_lock(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable();
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
inux-src/include/linux/lockdep.h
#define LOCK_CONTENDED(_lock, try, lock)
lock(_lock)
linux-src/include/linux/spinlock.h
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
arch_spin_lock(&lock->raw_lock);
}
linux-src/arch/x86/include/asm/spinlock.h
static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };
inc = xadd(&lock->tickets, inc);
if (likely(inc.head == inc.tail))
goto out;
for (;;) {
unsigned count = SPIN_THRESHOLD;
do {
inc.head = READ_ONCE(lock->tickets.head);
if (__tickets_equal(inc.head, inc.tail))
goto clear_slowpath;
cpu_relax();
} while (--count);
__ticket_lock_spinning(lock, inc.tail);
}
clear_slowpath:
__ticket_check_and_clear_slowpath(lock, inc.head);
out:
barrier(); /* make sure nothing creeps before the lock is taken */
}
通過層層調用,spin_lock最終調用架構下的函數arch_spin_lock。arch_spin_lock函數和我們前面講的邏輯是一樣的,只不過是代碼實現需要稍微解釋一下。首先定義了一個局部變量inc,inc的初始值是tail為1,然后通過xadd函數把自旋鎖的tail加1,并返回原自旋鎖的值,xadd函數是原子操作,此時得到的inc的tail值就是我們的票號ticket。先判斷一下我們的ticket(inc.tail)是否和owner(inc.head)相等,如果相等代表我們加鎖成功了,goto out。如果不相等,就進入一個無限for循環,不停地讀取lock->tickets.head的值,和自己的ticket比較,如果不相等就一直比較,如果相等則代表我們加鎖成功了,退出循環。為了避免其它代碼有問題而產生死鎖,上述操作是在for循環里面又加了個do while循環,只循環一定的次數。
3.3 解鎖操作
下面我們看一下票號自旋鎖的解鎖操作:
linux-src/include/linux/spinlock.h
static inline void spin_unlock(spinlock_t *lock)
{
raw_spin_unlock(&lock->rlock);
}
#define raw_spin_unlock(lock) _raw_spin_unlock(lock)
linux-src/kernel/locking/spinlock.c
void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
{
__raw_spin_unlock(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
do_raw_spin_unlock(lock);
preempt_enable();
}
linux-src/include/linux/spinlock.h
static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{
arch_spin_unlock(&lock->raw_lock);
}
linux-src/arch/x86/include/asm/spinlock.h
static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
__add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);
}
解鎖操作還是挺簡單的,最終只是把head也就是owner加1了而已。
四、MCS自旋鎖
上面的票號自旋鎖完美地解決了公平問題,邏輯簡單,代碼簡潔。但是還存在著一個嚴重的問題,就是當鎖競爭比較激烈的時候,大家都在自旋head變量,會導致緩存顛簸,嚴重降低了CPU的性能。為了解決這個問題,我們應當設計出一種鎖,把所有排隊等鎖的線程放到一個隊列上,每個線程都自旋自己的節點,這樣就不會有緩存顛簸問題了,而且還是公平鎖。MCS鎖就是這么設計的,鎖本身是個指針,指向排隊隊列的末尾,每個申請加鎖的線程都要自己創建一個鎖節點,然后把自己放到這個隊列的末尾并讓鎖指針指向自己,最后在自己的節點上自旋。當線程解鎖時,要看看自己的next指針是否為空,如果不為空說明有人在等鎖,要把next節點設置為加鎖狀態,這樣下一個線程就獲得了自旋鎖。下面我們畫個圖看一下:
圖片演示了MCS自旋鎖基本的加鎖解鎖操作,但是有一個細節情況沒有考慮,這點在代碼里會進行分析。
下面我們用內核版本4.1來講解。
4.1 定義與初始化
我們先來看一下MCS自旋鎖的定義:
linux-src/kernel/locking/mcs_spinlock.h
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked; /* 1 if lock acquired */
};
這個定義非常簡單,沒有復雜的嵌套定義。要注意的是MCS鎖本身是 struct mcs_spinlock *,是個指針,而結構體struct mcs_spinlock本身并不是鎖,而是加鎖時的排隊節點,我們把它叫做鎖節點,這是MCS鎖與大部分鎖不同的地方,大部分鎖都只有一個鎖變量,不需要加鎖線程再去分配鎖節點,而MCS鎖需要加鎖線程去分配一個鎖節點。
MCS自旋鎖沒有特定的初始化,就是定義一個空指針而已。
struct mcs_spinlock * lock = NULL;
Lock為空指針代表鎖處于空閑狀態。
4.2 加鎖操作
下面我們來看一下MCS自旋鎖的加鎖操作:
linux-src/kernel/locking/mcs_spinlock.h
static inline
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
struct mcs_spinlock *prev;
/* Init node */
node->locked = 0;
node->next = NULL;
prev = xchg(lock, node);
if (likely(prev == NULL)) {
return;
}
WRITE_ONCE(prev->next, node);
/* Wait until the lock holder passes the lock down. */
arch_mcs_spin_lock_contended(&node->locked);
}
原子地交換鎖變量的原值和本線程鎖節點的地址值并返回鎖變量的原值保存到prev變量。如果prev的值是空指針,代表鎖變量之前是空閑狀態,我們是第一個加鎖的,直接獲得了鎖,直接return。如果prev不為NULL,說明有人已經獲得了鎖,我們只能等待,讓prev的next指針指向自己,然后在自己的locked上自旋。
4.3 解鎖操作
下面我們看一下解鎖操作:
linux-src/kernel/locking/mcs_spinlock.h
static inline
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
struct mcs_spinlock *next = READ_ONCE(node->next);
if (likely(!next)) {
if (likely(cmpxchg(lock, node, NULL) == node))
return;
/* Wait until the next pointer is set */
while (!(next = READ_ONCE(node->next)))
cpu_relax_lowlatency();
}
/* Pass lock to next waiter. */
arch_mcs_spin_unlock_contended(&next->locked);
}
先讀出自己的next指針,如果為空指針,說明我們是最后一個線程,可以直接返回了。但是在返回前要把鎖變量設為空指針,代表鎖現在是空閑狀態。但是這里并不是直接設置,而是使用原子交換CAS,只有當鎖變量指向自己的時候才把鎖變量置為空,這么做是為了避免和加鎖操作發生沖突。如果操作成功,代表釋放鎖成功,直接return。如果操作失敗,說明有線程在同時執行加鎖操作,它會把我們的next指針設置為指向它,然后在它的locked上自旋,所以我們要等我們的next被設置之后也就是不為空的時候,再把next->locked設置為1。如果一開始我們的next指針就不為空,那么直接把next->locked設置為1就行了。下一個線程發現自己的locked為1就會結束自旋,從而獲得了鎖。
五、隊列自旋鎖
MCS鎖有一個很大的問題就是它改變了自旋鎖的接口,這是一個很嚴重的問題,內核里使用自旋鎖的地方很多,如果把自旋鎖都改為MCS自旋鎖,那將是非常麻煩的。同時MCS還有一個問題就是它的體積增大了,這也是一個很嚴重的問題。為了解決MCS自旋鎖的問題,內核又開發了隊列自旋鎖。它結合了MCS鎖的優點,但是又做了很大的改進,同時又優化了鎖競爭比較少的場景。隊列自旋鎖對MCS自旋鎖的優化原理是,一個系統最多同時有NR_CPU個自旋鎖在運行,所以沒必要每個加鎖線程都自己分配一個鎖節點,我們在系統全局預分配NR_CPU個鎖節點就可以了,哪個CPU上要執行自旋鎖,就去用對應的鎖節點就可以了。這是對于只有線程的情況,實際上還有軟中斷、硬中斷、NMI,它們后者都可以搶占前者,都能搶占線程,所以整個系統實際上總共需要NR_CPU * 4 個鎖節點就足夠了。隊列自旋鎖還對只有兩個線程去搶鎖的情況作了優化,這種情況下不會使用MCS的排隊邏輯。
下面我們用一個比喻來說一下隊列自旋鎖的總體邏輯。我們把鎖的位置比作皇位,搶到皇位就是加鎖成功就可以進入臨界區了。第一個來搶鎖的人就是直接搶鎖成功,搶到皇位。第二個來搶鎖的人發現皇位已經被搶了,退而求其次,搶占太子位,然后一直自旋皇位,一旦皇帝駕崩讓出皇位,自己就立馬搶占皇位。第三個來搶鎖的人發現皇位和太子位都被搶了,沒有辦法只能去搶太孫的位置,然后同時自旋太子位和皇位。當皇位空缺的時候,太子會替補到皇位,此時太子位空缺,但是太孫并不會去搶占太子位,他還待在太孫位上,直到太子位和皇位同時空缺了,他才會一步到位,直接從太孫位上登基為皇帝。第四個人來了發現皇位、太子位、太孫位都被搶了,就只能占個皇孫位了,從第四個人開始包括后面來的每個人都是皇孫,所有皇孫依次排好隊等待進位成太孫。太孫其實也算是皇孫,太孫是第一皇孫,其它的都是普通皇孫。皇孫也在自旋,只不過皇孫是在自己家門口自旋,他一直在等待上一任太孫到自己家門口叫自己。太孫發現皇位和太子位同時空缺了之后就會去繼承皇帝位,同時去通知第二皇孫,太孫位空出來了,你可以來當太孫了。然后第二皇孫就變成太孫了,變成太孫之后他也是去同時自旋太子位和皇位。當他也登基稱帝之后他也會去通知后面的第二皇孫來進位太孫位。然后就一直繼續這樣的邏輯,后面來的每個人只要發現有太孫存在就只能去占個皇孫位來排隊,然后在自己家門口自旋。在這個過程中太子位是一直空缺的。除非最后一個太孫登基稱帝之后發現沒有皇孫了,此時就沒有人進位成太孫了,如果此時再來了人搶位子,而皇位還被占著,他才會去搶太子位。
前面說的邏輯比較復雜,我們再來總結一下,當只有兩個人搶鎖時,一個占據皇帝位也就是搶鎖成功,一個人占據太子位,同時自旋皇位。也就是說同時搶鎖的人小于等于兩人時不會使用排隊機制。第三人來搶鎖的話就會啟動排隊機制,他排在隊列的第一位,是第一皇孫,也叫太孫,之后來的人都是普通皇孫,要依次排隊。皇孫都是在自己家門口自旋自己,等待前太孫來通知自己進位為太孫。太孫的邏輯是最為復雜的,他要同時自旋太子位和皇位,只有當太子位和皇位都空缺時,太孫才會一步到位登基稱帝,然后通知第二皇孫進位為太孫。解鎖的邏輯很簡單,只需要把皇位設為0就可以了,什么都不用管,因為太子、太孫他們自己會自旋皇位。
隊列自旋鎖的實現是把原先的鎖變量int分為三部分,一個locked字節,對應我們所說的皇位,一個pending字節,對應我們所說的太子位,一個tail雙字節,它指向皇孫隊列的末尾,皇孫隊列的隊首是太孫。tail不是個指針,而是邏輯指針,它是通過編碼的方式指向隊尾皇孫的。每個皇孫都對應一個鎖節點,系統預先分配了NR_CPU * 4個鎖節點,NR_CPU代表1個CPU 1個,為什么乘以4呢,因為1個CPU上最多可以同時嵌套4個執行流,分別是線程、軟中斷、硬中斷、非屏蔽中斷。tail有16位,分兩部分編碼,其中2位用來編碼是哪個執行流,14位用來編碼CPU index。CPU編碼時要加1,因為CPU index從0開始,而tail等于0有特殊含義,代表的是空指針,也就是沒有皇孫來競爭,所以要加上1做區分。當一個線程(執行流)來爭鎖時,如果太子位被搶了或者已經有太孫了,自己就需要加入皇孫隊列,加入皇孫隊列的方法就是根據自己所在的CPU index 和自己的執行流等級去拿一個鎖節點,把這個鎖節點加入到隊列中去,并自旋這個鎖節點。
下面我們畫圖來看一下。
現在大家應該對隊列自旋鎖的邏輯很熟悉了,下面我們以內核版本5.15.28為例,來看一下隊列自旋的代碼實現。
5.1 定義與初始化
我們先來看一下隊列自旋鎖的定義:
linux-src/include/linux/spinlock_types.h
typedef struct spinlock {
struct raw_spinlock rlock;
} spinlock_t;
linux-src/include/linux/spinlock_types_raw.h
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
} raw_spinlock_t;
linux-src/include/asm-generic/qspinlock_types.h
typedef struct qspinlock {
union {
atomic_t val;
struct {
u8locked;
u8pending;
};
struct {
u16locked_pending;
u16tail;
};
};
} arch_spinlock_t;
可以看出隊列自旋鎖的定義最終與原始自旋鎖和票號自旋鎖的大小是一樣的。隊列自旋鎖也使用了共用體的技巧,把一個4字節的int拆分成了1個字節的locked,一個字節的pending,兩個字節的tail。
下面我們來看一下初始化,先看靜態初始化:
linux-src/include/linux/spinlock_types.h
#define DEFINE_SPINLOCK(x)spinlock_t x = __SPIN_LOCK_UNLOCKED(x)
#define __SPIN_LOCK_UNLOCKED(lockname)
(spinlock_t) __SPIN_LOCK_INITIALIZER(lockname)
#define __SPIN_LOCK_INITIALIZER(lockname)
{ { .rlock = ___SPIN_LOCK_INITIALIZER(lockname) } }
#define ___SPIN_LOCK_INITIALIZER(lockname)
{
.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED,
}
linux-src/include/asm-generic/qspinlock_types.h
#define__ARCH_SPIN_LOCK_UNLOCKED{ { .val = ATOMIC_INIT(0) } }
再看動態初始化
linux-src/include/linux/spinlock.h
# define spin_lock_init(_lock)
do {
*(_lock) = __SPIN_LOCK_UNLOCKED(_lock);
} while (0)
無論靜態初始化還是動態初始化都是把鎖變量的整個int初始化為0。
5.2 加鎖操作
我們先來看一下鎖節點的定義和相關操作:
linux-src/kernel/locking/qspinlock.c
struct qnode {
struct mcs_spinlock mcs;
};
static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);
#define MAX_NODES4
可以看到鎖節點用的是MCS自旋鎖的鎖節點類型,然后定義了一個per CPU變量,每個CPU上有4個節點,代表4層執行流,線程、軟中斷、硬中斷、屏蔽中斷。和MCS自旋鎖不同的是,MCS自旋鎖需要每個線程在申請鎖時自己提供鎖節點,而隊列自旋鎖是提前定義好的全局靜態變量,每個執行流在申請鎖時根據自己所在的CPU index 和執行流層級去使用對應的鎖節點,加鎖成功后鎖節點就默認放回了。使用鎖節點時執行個查詢操作就可以了,放回鎖節點什么也不用做,這是自旋鎖的特點所決定的。因為自旋鎖是不能休眠的,所以自旋鎖的臨界區是一口氣執行完,不會切走讓其它線程也來申請自旋鎖,一個CPU上最左嵌套4層執行流,所以整個系統最多能同時有NR_CPU * 4個自旋鎖申請。所以系統預定義NR_CPU * 4個鎖節點就足夠了,用的時候就直接用,用完啥也不用管。
下面我們來看一下鎖節點的編碼與查找:
linux-src/kernel/locking/qspinlock.c
/*
* We must be able to distinguish between no-tail and the tail at 0:0,
* therefore increment the cpu number by one.
*/
static inline __pure u32 encode_tail(int cpu, int idx)
{
u32tail;
tail = (cpu + 1) << _Q_TAIL_CPU_OFFSET;
tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */
return tail;
}
static inline __pure struct mcs_spinlock *decode_tail(u32 tail)
{
int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1;
int idx = (tail & _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET;
return per_cpu_ptr(&qnodes[idx].mcs, cpu);
}
static inline __pure
struct mcs_spinlock *grab_mcs_node(struct mcs_spinlock *base, int idx)
{
return &((struct qnode *)base + idx)->mcs;
}
可以看到知道了CPU index和執行流層級就可以編碼出tail,知道了tail就可以解碼出CPU index和執行流層級,就可以去全局變量qnodes中找到對應的鎖節點。如果已經知道了CPU index對應的鎖節點base,再根據執行流層級也可以找到對應的鎖節點。
下面我們來看一下隊列自旋鎖的加鎖操作:
linux-src/include/linux/spinlock.h
static __always_inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
#define raw_spin_lock(lock)_raw_spin_lock(lock)
linux-src/kernel/locking/spinlock.c
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
__raw_spin_lock(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable();
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
linux-src/include/linux/lockdep.h
#define LOCK_CONTENDED(_lock, try, lock)
lock(_lock)
linux-src/include/linux/spinlock.h
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
arch_spin_lock(&lock->raw_lock);
}
linux-src/include/asm-generic/qspinlock.h
#define arch_spin_lock(l)queued_spin_lock(l)
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
int val = 0;
if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
return;
queued_spin_lock_slowpath(lock, val);
}
linux-src/include/asm-generic/qspinlock_types.h
#define _Q_LOCKED_VAL(1U << _Q_LOCKED_OFFSET)
#define _Q_LOCKED_OFFSET0
linux-src/kernel/locking/qspinlock.c
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
struct mcs_spinlock *prev, *next, *node;
u32 old, tail;
int idx;
BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
if (pv_enabled())
goto pv_queue;
if (virt_spin_lock(lock))
return;
/*
* Wait for in-progress pending->locked hand-overs with a bounded
* number of spins so that we guarantee forward progress.
*
* 0,1,0 -> 0,0,1
*/
if (val == _Q_PENDING_VAL) {
int cnt = _Q_PENDING_LOOPS;
val = atomic_cond_read_relaxed(&lock->val,
(VAL != _Q_PENDING_VAL) || !cnt--);
}
/*
* If we observe any contention; queue.
*/
if (val & ~_Q_LOCKED_MASK)
goto queue;
/*
* trylock || pending
*
* 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
*/
val = queued_fetch_set_pending_acquire(lock);
/*
* If we observe contention, there is a concurrent locker.
*
* Undo and queue; our setting of PENDING might have made the
* n,0,0 -> 0,0,0 transition fail and it will now be waiting
* on @next to become !NULL.
*/
if (unlikely(val & ~_Q_LOCKED_MASK)) {
/* Undo PENDING if we set it. */
if (!(val & _Q_PENDING_MASK))
clear_pending(lock);
goto queue;
}
/*
* We're pending, wait for the owner to go away.
*
* 0,1,1 -> 0,1,0
*
* this wait loop must be a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because not all
* clear_pending_set_locked() implementations imply full
* barriers.
*/
if (val & _Q_LOCKED_MASK)
atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
/*
* take ownership and clear the pending bit.
*
* 0,1,0 -> 0,0,1
*/
clear_pending_set_locked(lock);
lockevent_inc(lock_pending);
return;
/*
* End of pending bit optimistic spinning and beginning of MCS
* queuing.
*/
queue:
lockevent_inc(lock_slowpath);
pv_queue:
node = this_cpu_ptr(&qnodes[0].mcs);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);
/*
* 4 nodes are allocated based on the assumption that there will
* not be nested NMIs taking spinlocks. That may not be true in
* some architectures even though the chance of needing more than
* 4 nodes will still be extremely unlikely. When that happens,
* we fall back to spinning on the lock directly without using
* any MCS node. This is not the most elegant solution, but is
* simple enough.
*/
if (unlikely(idx >= MAX_NODES)) {
lockevent_inc(lock_no_node);
while (!queued_spin_trylock(lock))
cpu_relax();
goto release;
}
node = grab_mcs_node(node, idx);
/*
* Keep counts of non-zero index values:
*/
lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
/*
* Ensure that we increment the head node->count before initialising
* the actual node. If the compiler is kind enough to reorder these
* stores, then an IRQ could overwrite our assignments.
*/
barrier();
node->locked = 0;
node->next = NULL;
pv_init_node(node);
/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
* attempt the trylock once more in the hope someone let go while we
* weren't watching.
*/
if (queued_spin_trylock(lock))
goto release;
/*
* Ensure that the initialisation of @node is complete before we
* publish the updated tail via xchg_tail() and potentially link
* @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
*/
smp_wmb();
/*
* Publish the updated tail.
* We have already touched the queueing cacheline; don't bother with
* pending stuff.
*
* p,*,* -> n,*,*
*/
old = xchg_tail(lock, tail);
next = NULL;
/*
* if there was a previous node; link it and wait until reaching the
* head of the waitqueue.
*/
if (old & _Q_TAIL_MASK) {
prev = decode_tail(old);
/* Link @node into the waitqueue. */
WRITE_ONCE(prev->next, node);
pv_wait_node(node, prev);
arch_mcs_spin_lock_contended(&node->locked);
/*
* While waiting for the MCS lock, the next pointer may have
* been set by another lock waiter. We optimistically load
* the next pointer & prefetch the cacheline for writing
* to reduce latency in the upcoming MCS unlock operation.
*/
next = READ_ONCE(node->next);
if (next)
prefetchw(next);
}
/*
* we're at the head of the waitqueue, wait for the owner & pending to
* go away.
*
* *,x,y -> *,0,0
*
* this wait loop must use a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*
* The PV pv_wait_head_or_lock function, if active, will acquire
* the lock and return a non-zero value. So we have to skip the
* atomic_cond_read_acquire() call. As the next PV queue head hasn't
* been designated yet, there is no way for the locked value to become
* _Q_SLOW_VAL. So both the set_locked() and the
* atomic_cmpxchg_relaxed() calls will be safe.
*
* If PV isn't active, 0 will be returned instead.
*
*/
if ((val = pv_wait_head_or_lock(lock, node)))
goto locked;
val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
locked:
/*
* claim the lock:
*
* n,0,0 -> 0,0,1 : lock, uncontended
* *,*,0 -> *,*,1 : lock, contended
*
* If the queue head is the only one in the queue (lock value == tail)
* and nobody is pending, clear the tail code and grab the lock.
* Otherwise, we only need to grab the lock.
*/
/*
* In the PV case we might already have _Q_LOCKED_VAL set, because
* of lock stealing; therefore we must also allow:
*
* n,0,1 -> 0,0,1
*
* Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
* above wait condition, therefore any concurrent setting of
* PENDING will make the uncontended transition fail.
*/
if ((val & _Q_TAIL_MASK) == tail) {
if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
goto release; /* No contention */
}
/*
* Either somebody is queued behind us or _Q_PENDING_VAL got set
* which will then detect the remaining tail and queue behind us
* ensuring we'll see a @next.
*/
set_locked(lock);
/*
* contended path; wait for next if not observed yet, release.
*/
if (!next)
next = smp_cond_load_relaxed(&node->next, (VAL));
arch_mcs_spin_unlock_contended(&next->locked);
pv_kick_node(lock, next);
release:
/*
* release the node
*/
__this_cpu_dec(qnodes[0].mcs.count);
}
加鎖的時候要首先看一下是不是鎖變量的整個int都是0,如果是的話,說明皇位、太子位、太孫位都是空的,鎖現在是空閑的,沒有任何人競爭,我們直接把鎖變量設為1(用的是原子操作),代表我們搶鎖成功,直接返回。如果整個鎖變量不為0,說明存在鎖競爭,我們要走慢速路徑。
在慢速路徑里,首先處理的是如果遇到太子正在登基,則自旋等待太子登基成功。然后查看太子位是否被占,如果被占,則goto queue,也就是進入皇孫排隊流程(這個后面再講)。如果太子位沒被占,則嘗試占領太子位。如果搶占太子失敗,說明有其它線程也在搶太子位,我們搶失敗了,則我們則goto queue,也就是進入皇孫排隊流程(這個后面再講)。如果搶占太子位成功,則自旋皇帝位,一直自旋到皇帝駕崩把鎖置為0,則我們結束自旋,原子地占領皇位釋放太子位,然后return。
接下來是皇孫排隊邏輯,每一個新來的皇孫都要排到隊尾。隊尾是用鎖變量中的tail來記錄的。我們要先生成自己的隊尾編碼tail,找到自己對應的鎖節點。此時再嘗試一下加鎖操作,因為有可能現在太子太孫皇位都是空的,如果嘗試成功就結束流程,如果失敗則繼續往下走。然后原子地交換鎖變量的tail和自己的tail,這樣我們就成為新的隊尾了。然后我們再看old tail,分兩種情況,如果old tail是空,則說明我們是第一個皇孫,也就是太孫,走太孫邏輯,如果old tail不空,則說明我們是普通皇孫,走皇孫排隊邏輯。我們先說皇孫排隊邏輯。皇孫排隊先解碼old tail,找到其對應的鎖節點prev,然后讓prev的next指向自己,這樣我們就加入了排隊隊列。然后我們就在自己家里自旋,也就是自旋自己的node->locked。我們的自旋是在等待prev先成為太孫,然后當他登基稱帝之后,他就會來解除我們的自旋,然后我們就成為了太孫。
下面我們講太孫的邏輯,太孫的來源有兩種,一種是上面說的old tail為空,則我們直接就是太孫,是第一位太孫。第二種來源是普通皇孫進位為太孫。不管哪種來源的太孫,操作都是一樣的。太孫首先自旋太子位和皇位,當太子位和皇位同時空缺的時候才會結束自旋。結束自旋之后,先看看自己是不是唯一的皇孫,如果是的話則原子地加鎖。如果加鎖成功則結束流程,如果加鎖失敗則說明剛才發生了沖突,又有了新的皇孫加入。如果自己不是唯一的皇孫或者又有新的皇孫加入,則自己先搶占皇位,然后通知next皇孫結束自旋,next皇孫就會成為新的太孫,繼續執行太孫的流程。
5.3 解鎖操作
下面我們看一下隊列自旋鎖的解鎖操作:
linux-src/include/linux/spinlock.h
static __always_inline void spin_unlock(spinlock_t *lock)
{
raw_spin_unlock(&lock->rlock);
}
#defineraw_spin_unlock(lock)_raw_spin_unlock(lock)
linux-src/kernel/locking/spinlock.c
void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
{
__raw_spin_unlock(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
do_raw_spin_unlock(lock);
preempt_enable();
}
linux-src/include/linux/spinlock.h
static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{
arch_spin_unlock(&lock->raw_lock);
}
linux-src/include/asm-generic/qspinlock.h
#define arch_spin_unlock(l)queued_spin_unlock(l)
linux-src/include/asm-generic/qspinlock.h
static __always_inline void queued_spin_unlock(struct qspinlock *lock)
{
smp_store_release(&lock->locked, 0);
}
可以看到隊列自旋鎖的解鎖確實很簡單,只需要讓出皇位也就是把locked字節設為0就可以了。
六、自旋鎖的使用
前面幾節我們講了自旋鎖的發展歷史,以及每一代自旋鎖的實現原理。現在我們來講一講自旋鎖的使用問題,包括自旋鎖的適用場景、自旋鎖與禁用偽并發的配合使用問題,還有spinlock_t、raw_spin_lock該如何選擇的問題。
6.1 自旋鎖的適用場景
內核里有很多同步措施,我們什么時候應該使用自旋鎖呢,使用自旋鎖應該注意什么呢?首先自旋鎖適用那些臨界區比較小的地方,具體要多小呢,并沒有絕對的標準,我記的有的書上說要小于1000個指令或者100行代碼。其次臨界區內不能休眠,也就是不能有阻塞操作,如果臨界區內某些函數調用可能會阻塞,那就不能使用自旋鎖。使用自旋鎖要注意的點也是臨界區不能調用阻塞函數。但是很多時候并不太好判斷,有些函數明顯就是阻塞函數,肯定不能調用。但是有些函數自己不是阻塞的,而它層層調用的函數中有阻塞的,這就不太好發現了。
線程是可調度的,所以線程可以用互斥鎖、信號量,也能用自旋鎖。但是中斷(包括硬中斷和軟中斷)是不可調度的,也就是說,是不能休眠的,所以只能使用自旋鎖。
6.2 自旋鎖與禁用偽并發的配合使用
內核里有四種不同類型的執行流,線程、軟中斷、硬中斷、NMI中斷,前者不能搶占后者,但是后者能搶占前者。自旋鎖能防止兩個CPU同時進入臨界區,但是并不能防止本CPU的臨界區被高級的執行流所搶占。所以當兩個關聯臨界區在不同類型的執行流的時候,只使用自旋鎖是不夠的,低級執行流還得臨時禁止高級執行流的搶占才行。由于NMI中斷是不可禁止的,而且NMI中斷發生的概率非常低,一般我們的代碼也不會與NMI中斷發生關聯,所以NMI中斷就不考慮了。現在只剩下線程、軟中斷、硬中斷三種情況了。組合下來有6種情況,我們依依說明。線程對線程,自旋鎖內部已經禁用了線程搶占,所以兩個線程之間的臨界區直接使用自旋鎖就可以了。線程對軟中斷,線程會被軟中斷搶占,所以線程中要自旋鎖加禁用軟中斷,而軟中斷不會被線程搶占,所以軟中斷中只使用自旋鎖就可以了。線程對硬中斷,線程會被硬中斷搶占,所以線程中要自旋鎖加禁用硬中斷,而硬中斷不會被線程搶占,所以硬中斷中只使用自旋鎖就可以了。軟中斷對軟中斷,軟中斷中發生硬中斷,硬中斷返回時發現正在軟中斷中,不會再去執行軟中斷,只會排隊軟中斷,所以軟中斷對軟中斷只使用自旋鎖就可以了。軟中斷對硬中斷,由于硬中斷會搶占軟中斷,所以軟中斷中要禁用硬中斷,硬中斷中直接使用自旋鎖就可以了。硬中斷對硬中斷,現在內核里已經禁止中斷嵌套了,所以只使用自旋鎖就可以了。
我們下面來看一下它們的接口與實現。
自旋鎖并禁用軟中斷,軟中斷在這里就是下半部。
void spin_lock_bh(spinlock_t *lock)
void spin_unlock_bh(spinlock_t *lock)
實現如下,只分析加鎖部分,解鎖部分就不再分析了。
linux-src/include/linux/spinlock.h
static __always_inline void spin_lock_bh(spinlock_t *lock)
{
raw_spin_lock_bh(&lock->rlock);
}
inux-src/kernel/locking/spinlock.c
void __lockfunc _raw_spin_lock_bh(raw_spinlock_t *lock)
{
__raw_spin_lock_bh(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline void __raw_spin_lock_bh(raw_spinlock_t *lock)
{
__local_bh_disable_ip(_RET_IP_, SOFTIRQ_LOCK_OFFSET);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
可以看到自旋鎖的實現部分是一樣的,只是加了一個禁用軟中斷的調用,禁用軟中斷本身也會禁用線程搶占,所以這里沒有再去禁用搶占。
自旋鎖并禁用硬中斷,禁用軟中斷本身是帶計數功能的,可以嵌套調用,但是禁用硬中斷本身是沒有計數的,不能嵌套調用,所以內核提供了兩個版本,irq版lock會禁用中斷,unlock會開啟中斷,irqsave版lock會禁用中斷并保存現在的中斷狀態,unlock會恢復之前保存的中斷狀態。
void spin_lock_irq(spinlock_t *lock)
void spin_unlock_irq(spinlock_t *lock)
void spin_lock_irqsave(lock, flags)
void spin_unlock_irqsave(lock, flags)
實現如下,只分析加鎖部分,解鎖部分就不再分析了。
spin_lock_irq
linux-src/include/linux/spinlock.h
static __always_inline void spin_lock_irq(spinlock_t *lock)
{
raw_spin_lock_irq(&lock->rlock);
}
#define raw_spin_lock_irq(lock)_raw_spin_lock_irq(lock)
linux-src/kernel/locking/spinlock.c
void __lockfunc _raw_spin_lock_irq(raw_spinlock_t *lock)
{
__raw_spin_lock_irq(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
local_irq_disable();
preempt_disable();
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
spin_lock_irqsave
linux-src/include/linux/spinlock.h
do {
raw_spin_lock_irqsave(spinlock_check(lock), flags);
} while (0)
do {
typecheck(unsigned long, flags);
flags = _raw_spin_lock_irqsave(lock);
} while (0)
linux-src/kernel/locking/spinlock.c
unsigned long __lockfunc _raw_spin_lock_irqsave(raw_spinlock_t *lock)
{
return __raw_spin_lock_irqsave(lock);
}
linux-src/include/linux/spinlock_api_smp.h
static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
{
unsigned long flags;
local_irq_save(flags);
preempt_disable();
do_raw_spin_lock_flags(lock, &flags);
return flags;
}
linux-src/include/linux/spinlock.h
static inline void
do_raw_spin_lock_flags(raw_spinlock_t *lock, unsigned long *flags) __acquires(lock)
{
arch_spin_lock_flags(&lock->raw_lock, *flags);
}
可以看到自旋鎖的實現部分是一樣的,只是加了一個禁用硬中斷和禁止搶占的調用。
6.3 raw_spin_lock的使用問題
可能很多人在看到內核代碼時會感到有些奇怪,為啥有些地方用的是spinlock_t,有些地方用的卻是raw_spinlock_t?raw_spinlock_t不是spinlock_t的實現細節嗎,我們不是應該只使用接口性的東西,而不要使用實現性的東西嗎?再仔細看spinlock_t和raw_spinlock_t的實質邏輯,好像也沒啥區別啊?要回答這個問題,我們就要先從一件事情談起,RTLinux。什么是RTLinux,什么是實時性?實時性是指一個系統對外部事件響應的及時性。很多嵌入式系統的OS都是實時OS,它們可以快速地對外部事件進行響應。這倒不是因為它們有多厲害,而是因為嵌入式系統都比較簡單,它們面臨的環境比較簡單,要做的事情也比較簡單,所以能做到及時性。而Linux是一個通用操作系統內核,通用這個詞就代表Linux要面臨很多情況,處理很多問題,所以就很難做到及時性。做到及時性最根本的一點就是要及時處理中斷,因為中斷代表的就是外部事件。但是在Linux內核里,有很多需要同步的地方都會禁用中斷,這就導致中斷不能及時響應。Linux在處理中斷的時候也會禁用中斷,Linux在這方面已經想了很多辦法來解決,比如盡可能地縮小中斷處理程序,把事情盡量都放到軟中斷或者線程里面去做。當很多中斷處理的事情都被放到線程中去執行了,我們又面臨著另外一個問題,如何盡快地讓這些線程去搶占CPU立馬獲得執行。當一個非常不緊急的線程正好執行到自旋鎖的臨界區時,我們的非常著急的中斷處理線程想獲得CPU卻沒有辦法,因為自旋鎖的臨界區不能休眠也就是說不可搶占,我們只能干等著。因此把自旋鎖變得可休眠就成為了提高Linux的實時性的重要方法。為此Ingo Molnar等人開發了一個項目RTLinux,專門來提高Linux的實時性。其中一個很重要的方法就是把自旋鎖替換為可休眠鎖。但是有些臨界區是確實不能休眠的,那怎么辦呢?這些臨界區就用raw_spinlock_t,raw_spinlock_t還保持原來的自旋語義,不會休眠。到目前為止(內核版本5.15.28),RTLinux還沒有合入標準內核,所以目前的標準內核里raw_spinlock_t和spinlock_t效果是一樣的。但是大家在內核編程的時候還是要盡量使用spinlock_t,除非你的臨界區真的不能休眠,才去使用raw_spinlock_t。
審核編輯:湯梓紅
-
內核
+關注
關注
3文章
1372瀏覽量
40292 -
Linux
+關注
關注
87文章
11304瀏覽量
209524 -
自旋鎖
+關注
關注
0文章
11瀏覽量
1584
原文標題:深入理解Linux自旋鎖
文章出處:【微信號:LinuxDev,微信公眾號:Linux閱碼場】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論