總結(jié):Java屆很難得有讀百十行代碼就能增加修煉的機會,這里有一個。
通常,我在看書的時候一般不寫代碼,因為我的腦袋被設(shè)定成單線程的,一旦同時喂給它不同的信息,它就無法處理。
但多線程對電腦來說就是小菜一碟,它可以同時做很多事,看起來匪夷所思。好希望把自己的大腦皮層移植到這些牛x的設(shè)備上。
用人腦思考電腦正在思考的問題,這本身就是一種折磨。但平常的工作和面試中,又不得不面對這樣的場景,所以多線程就成了編程路上一塊難啃的骨頭。
HikariCP是SpringBoot默認的數(shù)據(jù)庫連接池,它毫不謙虛的的起了一個叫做光
的名字,這讓國產(chǎn)Druid很沒面子。
還是言歸正傳,看一下Hikari中的ConcurrentBag吧。
核心數(shù)據(jù)結(jié)構(gòu)
多線程代碼一個讓人比較頭疼的問題,就是每個API我都懂,但就是不會用。很多對concurrent包倒背如流的同學(xué),在面對現(xiàn)實的問題時,到最后依然不得不被迫加上Lock或者synchronized。
ConcurrentBag是一個Lock free的數(shù)據(jù)結(jié)構(gòu),主要用作數(shù)據(jù)庫連接的存儲,可以說整個HikariCP的核心就是它。刪掉亂七八糟的注釋和異常處理,可以說關(guān)鍵的代碼也就百十來行,但里面的道道卻非常的多。
ConcurrentBag速度很快,要達到這個目標(biāo),就需要一定的核心數(shù)據(jù)結(jié)構(gòu)支持。
privatefinalCopyOnWriteArrayListsharedList;
privatefinalThreadLocal
-
sharedList
用來緩存所有的連接,是一個CopyOnWriteArrayList結(jié)構(gòu)。 -
threadList
用來緩存某個線程所使用的所有連接,相當(dāng)于快速引用,是一個ThreadLocal類型的ArrayList。 -
waiters
當(dāng)前正在獲取連接的等待者數(shù)量。AtomicInteger,就是一個自增對象。當(dāng)waiters的數(shù)量大于0時候,意味著有線程正在獲取資源。 -
handoffQueue
0容量的快速傳遞隊列,SynchronousQueue類型的隊列,非常有用。
ConcurrentBag里面的元素,為了能夠無鎖化操作,需要使用一些變量來標(biāo)識現(xiàn)在處于的狀態(tài)。抽象的接口如下:
publicinterfaceIConcurrentBagEntry{
intSTATE_NOT_IN_USE=0;
intSTATE_IN_USE=1;
intSTATE_REMOVED=-1;
intSTATE_RESERVED=-2;
booleancompareAndSet(intexpectState,intnewState);
voidsetState(intnewState);
intgetState();
}
有了這些數(shù)據(jù)結(jié)構(gòu)的支持,我們的ConcurrentBag就可以實現(xiàn)它光的宣稱了。
基于 Spring Boot + MyBatis Plus + Vue & Element 實現(xiàn)的后臺管理系統(tǒng) + 用戶小程序,支持 RBAC 動態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
- 項目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 視頻教程:https://doc.iocoder.cn/video/
獲取連接
連接的獲取是borrow
方法,還可以傳入一個timeout作為超時控制。
publicTborrow(longtimeout,finalTimeUnittimeUnit)throwsInterruptedException
首先,如果某個線程執(zhí)行非常快,使用了比較多的連接,就可以使用ThreadLocal的方式快速獲取連接對象,而不用跑到大池子里面去獲取。代碼如下。
//Trythethread-locallistfirst
finalvarlist=threadList.get();
for(inti=list.size()-1;i>=0;i--){
finalvarentry=list.remove(i);
finalTbagEntry=weakThreadLocals?((WeakReference)entry).get():(T)entry;
if(bagEntry!=null&&bagEntry.compareAndSet(STATE_NOT_IN_USE,STATE_IN_USE)){
returnbagEntry;
}
}
我們都知道,包括ArrayList和HashMap一些基礎(chǔ)的結(jié)構(gòu),都是Fail Fast的,如果你在遍歷的時候,刪掉一些數(shù)據(jù),有可能會引起問題。幸運的是,由于我們的List是從ThreadLocal獲取的,它首先就避免了線程安全的問題。
接下來就是遍歷。這段代碼采用的是尾遍歷(頭遍歷會出現(xiàn)錯誤),用于快速的從列表中找到一個可以復(fù)用的對象,然后使用CAS來把狀態(tài)置為使用中。但如果對象正在被使用,則直接刪除它。
在ConcurrentBag里,每個ThreadLocal最多緩存50個連接對象引用。
當(dāng)ThreadLocal里找不到可復(fù)用的對象,它就會到大池子里去拿。也就是下面這段代碼。
//Otherwise,scanthesharedlist...thenpollthehandoffqueue
finalintwaiting=waiters.incrementAndGet();
try{
for(TbagEntry:sharedList){
if(bagEntry.compareAndSet(STATE_NOT_IN_USE,STATE_IN_USE)){
//Ifwemayhavestolenanotherwaiter'sconnection,requestanotherbagadd.
if(waiting>1){
listener.addBagItem(waiting-1);
}
returnbagEntry;
}
}
listener.addBagItem(waiting);
//還拿不到,就需要等待別人釋放了
timeout=timeUnit.toNanos(timeout);
do{
finalvarstart=currentTime();
finalTbagEntry=handoffQueue.poll(timeout,NANOSECONDS);
if(bagEntry==null||bagEntry.compareAndSet(STATE_NOT_IN_USE,STATE_IN_USE)){
returnbagEntry;
}
timeout-=elapsedNanos(start);
}while(timeout>10_000);
returnnull;
}
finally{
waiters.decrementAndGet();
}
首先要注意,這段代碼可能是由不同的線程執(zhí)行的,所以必須要考慮線程安全問題。由于shardList是線程安全的CopyOnWriteArrayList,適合讀多寫少的場景,我們可以直接進行遍歷。
這段代碼的目的是一樣的,需要從sharedList找到一個空閑的連接對象。這里把自增的waiting變量傳遞到外面的代碼進行處理,主要是由于想要根據(jù)waiting的大小來確定是否創(chuàng)建新的對象。
如果無法從池子里獲取連接,則需要等待別的線程釋放一些資源。
創(chuàng)建對象的過程是異步的,要想獲取它,還需要依賴一段循環(huán)代碼。while循環(huán)代碼是納秒精度,會嘗試從handoffQueue里獲取。最終會調(diào)用SynchronousQueue的transfer方法。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現(xiàn)的后臺管理系統(tǒng) + 用戶小程序,支持 RBAC 動態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
歸還連接
有借就有還,當(dāng)某個連接使用完畢,它將被歸還到池子中。
publicvoidrequite(finalTbagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for(vari=0;waiters.get()>0;i++){
if(bagEntry.getState()!=STATE_NOT_IN_USE||handoffQueue.offer(bagEntry)){
return;
}
elseif((i&0xff)==0xff){
parkNanos(MICROSECONDS.toNanos(10));
}
else{
Thread.yield();
}
}
finalvarthreadLocalList=threadList.get();
if(threadLocalList.size()50){
threadLocalList.add(weakThreadLocals?newWeakReference<>(bagEntry):bagEntry);
}
}
首先,把這個對象置為可用狀態(tài)。然后,代碼會進入一個循環(huán),等待使用方把這個連接接手過去。當(dāng)連接處于STATE_NOT_IN_USE
狀態(tài),或者隊列中的數(shù)據(jù)被取走了,那么就可以直接返回了。
由于waiters.get()
是實時獲取的,有可能長時間一直大于0,這樣代碼就會變成死循環(huán),浪費CPU。代碼會嘗試不同層次的睡眠,一個是每隔255個waiter睡10ns,一個是使用yield讓出cpu時間片。
如果歸還連接的時候并沒有被其他線程獲取到,那么最后我們會把歸還的連接放入到相對應(yīng)的ThreadLocal里,因為對一個連接來說,借和還,通常是一個線程。
知識點
看起來平平無奇的幾行代碼,為什么搞懂了就能Hold住大部分的并發(fā)編程場景呢?主要還是這里面的知識點太多。下面我簡單羅列一下,你可以逐個攻破。
- 使用ThreadLocal來緩存本地資源引用,使用線程封閉的資源來減少鎖的沖突
- 采用讀多寫少的線程安全的CopyOnWriteArrayList來緩存所有對象,幾乎不影響讀取效率
- 使用基于CAS的AtomicInteger來計算等待者的數(shù)量,無鎖操作使得計算更加快速
- 0容量的交換隊列SynchronousQueue,使得對象傳遞更加迅速
- 采用compareAndSet的CAS原語來控制狀態(tài)的變更,安全且效率高。很多核心代碼都是這么設(shè)計的
- 在循環(huán)中使用park、yield等方法,避免死循環(huán)占用大量CPU
- 需要了解并發(fā)數(shù)據(jù)結(jié)構(gòu)中的offer、poll、peek、put、take、add、remove方法的區(qū)別,并靈活應(yīng)用
- CAS在設(shè)置狀態(tài)時,采用了volatile關(guān)鍵字修飾,對于volatile的使用也是一個常見的優(yōu)化點
- 需要了解WeakReference弱引用在垃圾回收時候的表現(xiàn)
麻雀雖小,五臟俱全。如果你想要你的多線程編程能力更上一層樓,讀一讀這個短小精悍的ConcurrentBag吧。當(dāng)你掌握了它,多線程的那些東西,不過是小菜一碟。
審核編輯 :李倩
-
cpu
+關(guān)注
關(guān)注
68文章
10878瀏覽量
212166 -
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3822瀏覽量
64506 -
多線程
+關(guān)注
關(guān)注
0文章
278瀏覽量
20016
原文標(biāo)題:讀懂HikariCP一百行代碼,多線程就是個孫子!
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論