Rust是一種系統(tǒng)級編程語言,它帶有嚴(yán)格的內(nèi)存管理、并發(fā)和安全性規(guī)則,因此很受廣大程序員的青睞。RwLock(讀寫鎖)是 Rust 中常用的線程同步機(jī)制之一,本文將詳細(xì)介紹 Rust 語言中的 RwLock 的內(nèi)部實現(xiàn)原理、常用接口的使用技巧和最佳實踐。
RwLock 的內(nèi)部實現(xiàn)原理
基本概念
RwLock 是一種讀寫分離的鎖,允許多個線程同時讀取共享數(shù)據(jù),但只允許一個線程寫入數(shù)據(jù)。通過這種方式,可以避免讀寫操作之間的競爭,從而提高并發(fā)性能。
在 Rust 中,RwLock 的實現(xiàn)基于 std::sync::RwLock 結(jié)構(gòu)體。其中,T 表示被保護(hù)的數(shù)據(jù)類型,需要滿足 Send 特質(zhì)以便可以在線程之間傳遞,并且需要滿足 Sync 特質(zhì)以便可以在線程之間共享。
RwLock 是在 std::sync::RwLock 結(jié)構(gòu)體上實現(xiàn)的,為了方便說明,下文中假設(shè) T 為 u32 類型。
RwLock 的基本結(jié)構(gòu)
RwLock 的基本結(jié)構(gòu)如下:
use std::sync::RwLock;
let lock = RwLock::new(0u32);
該代碼將創(chuàng)建一個 RwLock 對象,其中 T 類型為 u32,初始化值為 0,即該鎖保護(hù)的是一個名為 data 的 u32 類型變量。
RwLock 的鎖定機(jī)制
我們可以通過鎖定 RwLock 來對數(shù)據(jù)進(jìn)行保護(hù)。RwLock 提供了四個方法來完成鎖定操作:
read()
方法:獲取讀鎖,并返回一個 RAII(資源獲取即初始化)的讀取守衛(wèi)。多個線程可以同時獲取讀鎖,但是不能同時持有寫鎖。
try_read()
方法:非阻塞地獲取讀鎖。如果讀鎖已經(jīng)被占用,則返回 None。
write()
方法:獲取寫鎖,并返回一個 RAII 的寫入守衛(wèi)。如果有任何線程正在持有讀鎖或?qū)戞i,則阻塞等待直到它們釋放鎖。
try_write()
方法:非阻塞地獲取寫鎖。如果寫鎖已經(jīng)被占用,則返回None
。
對于讀寫鎖,我們需要保證寫操作在讀操作之前,因此,在調(diào)用 write 方法時,會等待所有的讀取守衛(wèi)被釋放,并阻止新的讀取守衛(wèi)的創(chuàng)建。為了避免死鎖和優(yōu)先級反轉(zhuǎn),寫入守衛(wèi)還可以降低優(yōu)先級。
讀寫鎖的實現(xiàn)主要是通過兩個 Mutex 來實現(xiàn)的。一個 Mutex 用于保護(hù)讀取計數(shù)器,另一個 Mutex 用于保護(hù)寫入狀態(tài)。讀取計數(shù)器統(tǒng)計當(dāng)前存在多少個讀取鎖,每當(dāng)一個新的讀取鎖被請求時,讀取計數(shù)器就會自增。當(dāng)讀取計數(shù)器為 0 時,寫入鎖可以被請求。
RwLock 的 Poisoning
類似于 Mutex,RwLock 也支持 poisoning 機(jī)制。如果 RwLock 發(fā)生 panic,那么鎖就成了 poison 狀態(tài),也就是無法再被使用。任何試圖獲取這個鎖的線程都會 panic,而不是被阻塞。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let readers = (0..6)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let guard = lock.read().unwrap();
println!("read: {}", *guard);
})
})
.collect::< Vec< _ >>();
let writers = (0..2)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
})
.collect::< Vec< _ >>();
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
運(yùn)行后,可能會出現(xiàn)以下異常信息:
thread 'main' panicked at 'PoisonError { inner: ...
這里的 inner
表示調(diào)用 RwLock 的線程 panic 時產(chǎn)生的錯誤信息。
常用接口的使用技巧
read()
方法
read()
方法用于獲取讀鎖,并返回一個 RAII 的讀取守衛(wèi):
let lock = RwLock::new(0u32);
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
在上面的例子中,r1 和 r2 都是 RwLockWriteGuard 類型的對象,它們引用的數(shù)據(jù)類型是 u32。這意味著它們只允許讀取 u32 類型的數(shù)據(jù),并且無法改變它們的值。
讀取守衛(wèi)被析構(gòu)時,RwLock 的讀取計數(shù)器會減少,如果讀取計數(shù)器變?yōu)?0,則寫入鎖可以被請求。
write()
方法
write()
方法用于獲取寫鎖,并返回一個 RAII 的寫入守衛(wèi):
let lock = RwLock::new(0u32);
let mut w1 = lock.write().unwrap();
let mut w2 = lock.write().unwrap();
在上面的例子中,w1 和 w2 都是 RwLockWriteGuard 類型的對象,它們引用的數(shù)據(jù)類型是 u32。這意味著它們允許讀寫 u32 類型的數(shù)據(jù),并且可以改變它們的值。
寫入守衛(wèi)被析構(gòu)時,寫入鎖立即被釋放,并且所有等待讀取鎖和寫入鎖的線程都可以開始運(yùn)行。
try_read()
方法
try_read()
方法用于非阻塞地獲取讀鎖。如果讀鎖已經(jīng)被占用,則返回 None。
let lock = RwLock::new(0u32);
if let Some(r) = lock.try_read() {
println!("read: {}", *r);
} else {
println!("read lock is already taken");
}
try_write()
方法
try_write()
方法用于非阻塞地獲取寫鎖。如果寫鎖已經(jīng)被占用,則返回 None。
let lock = RwLock::new(0u32);
if let Some(mut w) = lock.try_write() {
*w += 1;
println!("write: {}", *w);
} else {
println!("write lock is already taken");
}
共享所有權(quán)
如果你想在多個線程之間共享一個 RwLock 對象,就需要使用 Arc(atomic reference counting,原子引用計數(shù))來包裝它:
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let readers = (0..6)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let guard = lock.read().unwrap();
println!("read: {}", *guard);
})
})
.collect::< Vec< _ >>();
let writers = (0..2)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
})
.collect::< Vec< _ >>();
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
// 輸出結(jié)果:
// read: 0
// read: 0
// read: 0
// read: 0
// read: 0
// read: 0
// write: 1
// write: 2
實現(xiàn)鎖超時功能
Rust標(biāo)準(zhǔn)庫中的RwLock目前是不支持讀/寫超時功能的。我們可以利用RwLock中非阻塞方法try_read和try_write實現(xiàn)超時的特征。
下面進(jìn)一步講解使用std::sync::RwLock和std::time::Duration來實現(xiàn)讀超時,具體步驟如下:
- 創(chuàng)建一個名為TimeoutRwLock的trait,其中包含read_timeout方法。
- 在TimeoutRwLock中添加默認(rèn)實現(xiàn)(default impl)。
- 在read_timeout方法中,通過RwLock的try_read_with_timeout方法來嘗試獲取讀取器(Reader),并且指定一個等待時間。
- 如果在等待時間內(nèi)成功獲取到讀取器,那么將讀取器返回;否則,返回一個錯誤。 下面是代碼實現(xiàn):
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;
use std::thread;
use std::thread::sleep;
trait TimeoutRwLock< T > {
fn read_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, String > {
match self.try_read_with_timeout(timeout) {
Ok(guard) = > Ok(guard),
Err(_) = > Err(String::from("timeout")),
}
}
fn try_read_with_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, () >;
}
impl< T > TimeoutRwLock< T > for RwLock< T > {
fn try_read_with_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, () > {
let now = std::time::Instant::now();
loop {
match self.try_read() {
Ok(guard) = > return Ok(guard),
Err(_) = > {
if now.elapsed() >= timeout {
return Err(());
}
std::thread::sleep(Duration::from_millis(10));
}
}
}
}
}
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let reader = {
let lock = lock.clone();
thread::spawn(
move || match lock.read_timeout(Duration::from_millis(100)) {
Ok(guard) = > {
println!("read: {}", *guard);
}
Err(e) = > {
println!("error: {:?}", e);
}
},
)
};
let writer = {
let lock = lock.clone();
thread::spawn(move || {
sleep(Duration::from_secs(1));
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
};
reader.join().unwrap();
writer.join().unwrap();
}
// 輸出結(jié)果:
// read: 0
// write: 1
在這個實現(xiàn)中,trait TimeoutRwLock中定義了一個read_timeout方法,它與try_read方法具有相同的輸入參數(shù)類型和輸出類型。default impl方法是一個嘗試在給定的等待時間內(nèi)獲取讀取器(Reader)的循環(huán),并在等待過程中使用線程(thread)的park_timeout方法來避免 CPU 占用過高。如果在等待時間內(nèi)成功獲取到讀取器(Reader),則返回讀取器;否則返回一個錯誤。
當(dāng)然,除了自己實現(xiàn)Trait外,還可以使用成熟的第三方庫,例如:parking_lot
RwLock最佳實踐
- ? 避免使用鎖
鎖是一種解決并發(fā)問題的基本機(jī)制,但由于鎖會引入競爭條件、死鎖和其他問題,因此應(yīng)盡量避免使用鎖。如果可能,應(yīng)使用更高級別的機(jī)制,例如 Rust 的通道(channel)。
- ? 避免過度使用讀寫鎖
在某些情況下,讀寫鎖可能會比互斥鎖更慢。例如,如果有太多的讀取器,并且它們在擁有讀取鎖時花費(fèi)了大量時間,那么寫入器的等待時間可能會很長。因此,使用讀寫鎖時,應(yīng)仔細(xì)考慮讀寫比例,以避免過度使用讀寫鎖。
- ? 鎖的可重入性
RwLock 是可重入的;一個線程占有寫鎖時可以再次占有讀鎖,并且同樣可以占有寫鎖。但這種情況要非常小心,因為可能會導(dǎo)致死鎖。
- ? 盡量縮小鎖的范圍
鎖的范圍越小,競爭就越少,性能就越好。因此,應(yīng)盡量在需要的地方使用鎖,而在不需要的地方釋放鎖。例如,在讀寫數(shù)據(jù)之前,可以先將數(shù)據(jù)復(fù)制到本地變量中,然后釋放鎖,以便其它線程可以訪問該數(shù)據(jù),而不必爭奪鎖。在本地變量上執(zhí)行讀寫操作時,不需要鎖定。
- ? 鎖的超時設(shè)置
在使用鎖時,應(yīng)該避免出現(xiàn)無限等待的情況。可以使用帶超時的鎖,當(dāng)?shù)却龝r間超過指定的時間時,會返回一個錯誤。這將防止出現(xiàn)死鎖或其他問題。
// 引入第三方庫處理超時
// parking_lot = "0.12.1"
use parking_lot::RwLock;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let rwlock = Arc::new(RwLock::new(0));
let start = Instant::now();
// 嘗試在 1 秒內(nèi)獲取讀鎖
let reader = loop {
if let Some(r) = rwlock.try_read_for(Duration::from_secs(1)) {
break r;
}
if start.elapsed() >= Duration::from_secs(5) {
panic!("Failed to acquire read lock within 5 seconds.");
}
};
// 嘗試在 1 秒內(nèi)獲取寫鎖
let mut writer = loop {
if let Some(w) = rwlock.try_write_for(Duration::from_secs(1)) {
break w;
}
if start.elapsed() >= Duration::from_secs(5) {
panic!("Failed to acquire write lock within 5 seconds.");
}
};
// 進(jìn)行讀寫操作
println!("Reader: {}", *reader);
*writer += 1;
println!("Writer: {}", *writer);
}
在上面的例子中,讀取器等待 100 毫秒后超時,寫入器等待 1 秒鐘才能成功完成寫入。
總結(jié)
RwLock 是 Rust 中一種常用的線程同步機(jī)制,可以提高程序的并發(fā)性能。它只允許一個線程寫入數(shù)據(jù),但可以讓多個線程同時讀取同一個數(shù)據(jù)。具體來說,RwLock 在實現(xiàn)上使用了兩個 Mutex,一個用于保護(hù)讀取計數(shù)器,另一個用于保護(hù)寫入狀態(tài)。在使用 RwLock 時,應(yīng)該注意縮小鎖的范圍、避免使用過多讀寫鎖以及防止死鎖等問題。
-
編程語言
+關(guān)注
關(guān)注
10文章
1945瀏覽量
34736 -
程序
+關(guān)注
關(guān)注
117文章
3787瀏覽量
81043 -
內(nèi)存管理
+關(guān)注
關(guān)注
0文章
168瀏覽量
14139 -
rust語言
+關(guān)注
關(guān)注
0文章
57瀏覽量
3009
發(fā)布評論請先 登錄
相關(guān)推薦
評論