1. 背景
一個(gè)主庫和N個(gè)應(yīng)用庫的數(shù)據(jù)源,并且會(huì)同時(shí)操作主庫和應(yīng)用庫的數(shù)據(jù),需要解決以下兩個(gè)問題:
- 如何動(dòng)態(tài)管理多個(gè)數(shù)據(jù)源以及切換?
- 如何保證多數(shù)據(jù)源場(chǎng)景下的數(shù)據(jù)一致性(事務(wù))?
本文主要探討這兩個(gè)問題的解決方案,希望能對(duì)讀者有一定的啟發(fā)。
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
- 項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 視頻教程:https://doc.iocoder.cn/video/
2. 數(shù)據(jù)源切換原理
通過擴(kuò)展Spring提供的抽象類AbstractRoutingDataSource
,可以實(shí)現(xiàn)切換數(shù)據(jù)源。其類結(jié)構(gòu)如下圖所示:
- targetDataSources&defaultTargetDataSource
項(xiàng)目上需要使用的所有數(shù)據(jù)源和默認(rèn)數(shù)據(jù)源。
- resolvedDataSources&resolvedDefaultDataSource
當(dāng)Spring容器創(chuàng)建AbstractRoutingDataSource
對(duì)象時(shí),通過調(diào)用afterPropertiesSet
復(fù)制上述目標(biāo)數(shù)據(jù)源。由此可見,一旦數(shù)據(jù)源實(shí)例對(duì)象創(chuàng)建完畢,業(yè)務(wù)無法再添加新的數(shù)據(jù)源。
- determineCurrentLookupKey
此方法為抽象方法,通過擴(kuò)展這個(gè)方法來實(shí)現(xiàn)數(shù)據(jù)源的切換。目標(biāo)數(shù)據(jù)源的結(jié)構(gòu)為:Map
其key為lookup key
。
我們來看官方對(duì)這個(gè)方法的注釋:
lookup key通常是綁定在線程上下文中,根據(jù)這個(gè)key去resolvedDataSources
中取出DataSource。
根據(jù)目標(biāo)數(shù)據(jù)源的管理方式不同,可以使用基于配置文件和數(shù)據(jù)庫表兩種方式。基于配置文件管理方案無法后續(xù)添加新的數(shù)據(jù)源,而基于數(shù)據(jù)庫表方案管理,則更加靈活。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
3. 配置文件解決方案
根據(jù)上面的分析,我們可以按照下面的步驟去實(shí)現(xiàn):
-
定義
DynamicDataSource
類繼承AbstractRoutingDataSource
,重寫determineCurrentLookupKey()
方法。 -
配置多個(gè)數(shù)據(jù)源注入
targetDataSources
和defaultTargetDataSource
,通過afterPropertiesSet()
方法將數(shù)據(jù)源寫入resolvedDataSources
和resolvedDefaultDataSource
。 -
調(diào)用
AbstractRoutingDataSource
的getConnection()
方法時(shí),determineTargetDataSource()
方法返回DataSource
執(zhí)行底層的getConnection()
。
其流程如下圖所示:
3.1 創(chuàng)建數(shù)據(jù)源
DynamicDataSource
數(shù)據(jù)源的注入,目前業(yè)界主流實(shí)現(xiàn)步驟如下:
在配置文件中定義數(shù)據(jù)源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
#主數(shù)據(jù)源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
#其他數(shù)據(jù)源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***
在代碼中配置Bean
@Configuration
publicclassDynamicDataSourceConfig{
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
publicDataSourcefirstDataSource(){
returnDruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.second")
publicDataSourcesecondDataSource(){
returnDruidDataSourceBuilder.create().build();
}
@Bean
@Primary
publicDynamicDataSourcedataSource(DataSourcefirstDataSource,DataSourcesecondDataSource){
Map
3.2 AOP處理
通過DataSourceAspect
切面技術(shù)來簡(jiǎn)化業(yè)務(wù)上的使用,只需要在業(yè)務(wù)方法添加@SwitchDataSource
注解即可完成動(dòng)態(tài)切換:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public@interfaceSwitchDataSource{
Stringvalue();
}
DataSourceAspect
攔截業(yè)務(wù)方法,更新當(dāng)前線程上下文DataSourceContextHolder
中存儲(chǔ)的key,即可實(shí)現(xiàn)數(shù)據(jù)源切換。
3.3 方案不足
基于AbstractRoutingDataSource
的多數(shù)據(jù)源動(dòng)態(tài)切換,有個(gè)明顯的缺點(diǎn),無法動(dòng)態(tài)添加和刪除數(shù)據(jù)源。在我們的產(chǎn)品中,不能把應(yīng)用數(shù)據(jù)源寫死在配置文件。接下來分享一下基于數(shù)據(jù)庫表的實(shí)現(xiàn)方案。
4. 數(shù)據(jù)庫表解決方案
我們需要實(shí)現(xiàn)可視化的數(shù)據(jù)源管理,并實(shí)時(shí)查看數(shù)據(jù)源的運(yùn)行狀態(tài)。所以我們不能把數(shù)據(jù)源全部配置在文件中,應(yīng)該將數(shù)據(jù)源定義保存到數(shù)據(jù)庫表。參考AbstractRoutingDataSource
的設(shè)計(jì)思路,實(shí)現(xiàn)自定義數(shù)據(jù)源管理。
4.1 設(shè)計(jì)數(shù)據(jù)源表
主庫的數(shù)據(jù)源信息仍然配置在項(xiàng)目配置文件中,應(yīng)用庫數(shù)據(jù)源配置參數(shù),則設(shè)計(jì)對(duì)應(yīng)的數(shù)據(jù)表。表結(jié)構(gòu)如下所示:
這個(gè)表主要就是DataSource
的相關(guān)配置參數(shù),其相應(yīng)的ORM操作代碼在此不再贅述,主要是實(shí)現(xiàn)數(shù)據(jù)源的增刪改查操作。
4.2 自定義數(shù)據(jù)源管理
4.2.1 定義管理接口
通過繼承AbstractDataSource
即可實(shí)現(xiàn)DynamicDataSource
。為了方便對(duì)數(shù)據(jù)源進(jìn)行操作,我們定義一個(gè)接口DataSourceManager
,為業(yè)務(wù)提供操作數(shù)據(jù)源的統(tǒng)一接口。
publicinterfaceDataSourceManager{
voidput(Stringvar1,DataSourcevar2);
DataSourceget(Stringvar1);
BooleanhasDataSource(Stringvar1);
voidremove(Stringvar1);
voidcloseDataSource(Stringvar1);
Collectionall() ;
}
該接口主要是對(duì)數(shù)據(jù)表中定義的數(shù)據(jù)源,提供基礎(chǔ)管理功能。
4.2.2 自定義數(shù)據(jù)源
DynamicDataSource
的實(shí)現(xiàn)如下圖所示:
根據(jù)前面的分析,AbstractRoutingDataSource
是在容器啟動(dòng)的時(shí)候,執(zhí)行afterPropertiesSet
注入數(shù)據(jù)源對(duì)象,完成之后無法對(duì)數(shù)據(jù)源進(jìn)行修改。DynamicDataSource
則實(shí)現(xiàn)DataSourceManager
接口,可以將數(shù)據(jù)表中的數(shù)據(jù)源加載到dataSources。
4.2.3 切面處理
這一塊的處理跟配置文件數(shù)據(jù)源方案處理方式相同,都是通過AOP技術(shù)切換lookup key。
publicDataSourcedetermineTargetDataSource(){
StringlookupKey=DataSourceContextHolder.getKey();
DataSourcedataSource=Optional.ofNullable(lookupKey)
.map(dataSources::get)
.orElse(defaultDataSource);
if(dataSource==null){
thrownewIllegalStateException("CannotdetermineDataSourceforlookupkey["+lookupKey+"]");
}
returndataSource;
}
4.2.4 管理數(shù)據(jù)源狀態(tài)
在項(xiàng)目啟動(dòng)的時(shí)候,加載數(shù)據(jù)表中的所有數(shù)據(jù)源,并執(zhí)行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder
類,根據(jù)數(shù)據(jù)源表的定義創(chuàng)建DataSource。在項(xiàng)目運(yùn)行過程中,可以使用定時(shí)任務(wù)對(duì)數(shù)據(jù)源進(jìn)行保活,為了提升性能再添加一層緩存。
AbstractRoutingDataSource
只支持單庫事務(wù),切換數(shù)據(jù)源是在開啟事務(wù)之前執(zhí)行。 Spring使用 DataSourceTransactionManager
進(jìn)行事務(wù)管理。開啟事務(wù),會(huì)將數(shù)據(jù)源緩存到DataSourceTransactionObject
對(duì)象中,后續(xù)的commit和 rollback事務(wù)操作實(shí)際上是使用的同一個(gè)數(shù)據(jù)源。
如何解決切庫事務(wù)問題?借助Spring的聲明式事務(wù)處理,我們可以在多次切庫操作時(shí)強(qiáng)制開啟新的事務(wù):
@SwitchDataSource
@Transactional(rollbackFor=Exception.class,propagation=Propagation.REQUIRES_NEW)
這樣的話,執(zhí)行切庫操作的時(shí)候強(qiáng)制啟動(dòng)新事務(wù),便可實(shí)現(xiàn)多次切庫而且事務(wù)能夠生效。但是這種事務(wù)方式,存在數(shù)據(jù)一致性問題:
假若ServiceB正常執(zhí)行提交事務(wù),接著返回ServiceA執(zhí)行并且發(fā)生異常。因?yàn)閮纱翁幚硎遣煌氖聞?wù),ServiceA這個(gè)事務(wù)執(zhí)行回滾,而ServiceA事務(wù)已經(jīng)提交。這樣的話,數(shù)據(jù)就不一致了。接下來,我們主要討論如何解決多庫的事務(wù)問題。
6. 多庫事務(wù)處理
6.1 關(guān)于事務(wù)的理解
首先有必要理解事務(wù)的本質(zhì)。
1.提到Spring事務(wù),就離不開事務(wù)的四大特性和隔離級(jí)別、七大傳播特性。
事務(wù)特性和離級(jí)別是屬于數(shù)據(jù)庫范疇。Spring事務(wù)的七大傳播特性是什么呢?它是Spring在當(dāng)前線程內(nèi),處理多個(gè)事務(wù)操作時(shí)的事務(wù)應(yīng)用策略,數(shù)據(jù)庫事務(wù)本身并不存在傳播特性。
2.Spring事務(wù)的定義包括:begin、commit、rollback、close、suspend、resume等動(dòng)作。
-
begin(事務(wù)開始): 可以認(rèn)為存在于數(shù)據(jù)庫的命令中,比如Mysql的
start transaction
命令,但是在JDBC編程方式中不存在。 -
close(事務(wù)關(guān)閉): Spring事務(wù)的close()方法,是把
Connection
對(duì)象歸還給數(shù)據(jù)庫連接池,與事務(wù)無關(guān)。 -
suspend(事務(wù)掛起): Spring中事務(wù)掛起的語義是:需要新事務(wù)時(shí),將現(xiàn)有的
Connection
保存起來(還有尚未提交的事務(wù)),然后創(chuàng)建新的Connection2
,Connection2
提交、回滾、關(guān)閉完畢后,再把Connection1
取出來繼續(xù)執(zhí)行。 - resume(事務(wù)恢復(fù)): 嵌套事務(wù)執(zhí)行完畢,返回上層事務(wù)重新綁定連接對(duì)象到事務(wù)管理器的過程。
實(shí)際上,只有commit、rollback、close是在JDBC真實(shí)存在的,而其他動(dòng)作都是應(yīng)用的語意,而非JDBC事務(wù)的真實(shí)命令。因此,事務(wù)真實(shí)存在的方法是:setAutoCommit()
、commit()
、rollback()
。
close()語義為:
- 關(guān)閉一個(gè)數(shù)據(jù)庫連接,這已經(jīng)不再是事務(wù)的方法了。
使用DataSource并不會(huì)執(zhí)行物理關(guān)閉,只是歸還給連接池。
6.2 自定義管理事務(wù)
為了保證在多個(gè)數(shù)據(jù)源中事務(wù)的一致性,我們可以手動(dòng)管理Connetion
的事務(wù)提交和回滾。考慮到不同ORM框架的事務(wù)管理實(shí)現(xiàn)差異,要求實(shí)現(xiàn)自定義事務(wù)管理不影響框架層的事務(wù)。
這可以通過使用裝飾器設(shè)計(jì)模式,對(duì)Connection
進(jìn)行包裝重寫commit和rolllback屏蔽其默認(rèn)行為,這樣就不會(huì)影響到原生Connection
和ORM框架的默認(rèn)事務(wù)行為。其整體思路如下圖所示:
這里并沒有使用前面提到的@SwitchDataSource
,這是因?yàn)槲覀冊(cè)?code style="font-size:14px;padding:2px 4px;margin-right:2px;margin-left:2px;color:rgb(30,107,184);background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;">TransactionAop中已經(jīng)執(zhí)行了lookupKey的切換。
6.2.1 定義多事務(wù)注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public@interfaceMultiTransaction{
StringtransactionManager()default"multiTransactionManager";
//默認(rèn)數(shù)據(jù)隔離級(jí)別,隨數(shù)據(jù)庫本身默認(rèn)值
IsolationLevelisolationLevel()defaultIsolationLevel.DEFAULT;
//默認(rèn)為主庫數(shù)據(jù)源
StringdatasourceId()default"default";
//只讀事務(wù),若有更新操作會(huì)拋出異常
booleanreadOnly()defaultfalse;
業(yè)務(wù)方法只需使用該注解即可開啟事務(wù),datasourceId
指定事務(wù)用到的數(shù)據(jù)源,不指定默認(rèn)為主庫。
6.2.3 包裝Connection
自定義事務(wù)我們使用包裝過的Connection
,屏蔽其中的commit&rollback
方法。這樣我們就可以在主事務(wù)里進(jìn)行統(tǒng)一的事務(wù)提交和回滾操作。
publicclassConnectionProxyimplementsConnection{
privatefinalConnectionconnection;
publicConnectionProxy(Connectionconnection){
this.connection=connection;
}
@Override
publicvoidcommit()throwsSQLException{
//connection.commit();
}
publicvoidrealCommit()throwsSQLException{
connection.commit();
}
@Override
publicvoidclose()throwsSQLException{
//connection.close();
}
publicvoidrealClose()throwsSQLException{
if(!connection.getAutoCommit()){
connection.setAutoCommit(true);
}
connection.close();
}
@Override
publicvoidrollback()throwsSQLException{
if(!connection.isClosed())
connection.rollback();
}
...
}
這里commit&close
方法不執(zhí)行操作,rollback執(zhí)行的前提是連接執(zhí)行close才生效。這樣不管是使用哪個(gè)ORM框架,其自身事務(wù)管理都將失效。事務(wù)的控制就交由MultiTransaction
控制了。
6.2.4 事務(wù)上下文管理
publicclassTransactionHolder{
//是否開啟了一個(gè)MultiTransaction
privatebooleanisOpen;
//是否只讀事務(wù)
privatebooleanreadOnly;
//事務(wù)隔離級(jí)別
privateIsolationLevelisolationLevel;
//維護(hù)當(dāng)前線程事務(wù)ID和連接關(guān)系
privateConcurrentHashMapconnectionMap;
//事務(wù)執(zhí)行棧
privateStackexecuteStack;
//數(shù)據(jù)源切換棧
privateStackdatasourceKeyStack;
//主事務(wù)ID
privateStringmainTransactionId;
//執(zhí)行次數(shù)
privateAtomicIntegertransCount;
//事務(wù)和數(shù)據(jù)源key關(guān)系
privateConcurrentHashMapexecuteIdDatasourceKeyMap;
}
每開啟一個(gè)事物,生成一個(gè)事務(wù)ID并綁定一個(gè)ConnectionProxy
。事務(wù)嵌套調(diào)用,保存事務(wù)ID和lookupKey至棧中,當(dāng)內(nèi)層事務(wù)執(zhí)行完畢執(zhí)行pop。這樣的話,外層事務(wù)只需在棧中執(zhí)行peek即可獲取事務(wù)ID和lookupKey。
6.2.5 數(shù)據(jù)源兼容處理
為了不影響原生事務(wù)的使用,需要重寫getConnection
方法。當(dāng)前線程沒有啟動(dòng)自定義事務(wù),則直接從數(shù)據(jù)源中返回連接。
@Override
publicConnectiongetConnection()throwsSQLException{
TransactionHoldertransactionHolder=MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
if(Objects.isNull(transactionHolder)){
returndetermineTargetDataSource().getConnection();
}
ConnectionProxyConnectionProxy=transactionHolder.getConnectionMap()
.get(transactionHolder.getExecuteStack().peek());
if(ConnectionProxy==null){
//沒開跨庫事務(wù),直接返回
returndetermineTargetDataSource().getConnection();
}else{
transactionHolder.addCount();
//開了跨庫事務(wù),從當(dāng)前線程中拿包裝過的Connection
returnConnectionProxy;
}
}
6.2.6 切面處理
切面處理的核心邏輯是:維護(hù)一個(gè)嵌套事務(wù)棧,當(dāng)業(yè)務(wù)方法執(zhí)行結(jié)束,或者發(fā)生異常時(shí),判斷當(dāng)前棧頂事務(wù)ID是否為主事務(wù)ID。如果是的話這時(shí)候已經(jīng)到了最外層事務(wù),這時(shí)才執(zhí)行提交和回滾。詳細(xì)流程如下圖所示:
packagecom.github.mtxn.transaction.aop;
importcom.github.mtxn.application.Application;
importcom.github.mtxn.transaction.MultiTransactionManager;
importcom.github.mtxn.transaction.annotation.MultiTransaction;
importcom.github.mtxn.transaction.context.DataSourceContextHolder;
importcom.github.mtxn.transaction.support.IsolationLevel;
importcom.github.mtxn.transaction.support.TransactionHolder;
importcom.github.mtxn.utils.ExceptionUtils;
importlombok.extern.slf4j.Slf4j;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.aspectj.lang.reflect.MethodSignature;
importorg.springframework.core.annotation.Order;
importorg.springframework.stereotype.Component;
importjava.lang.reflect.Method;
@Aspect
@Component
@Slf4j
@Order(99999)
publicclassMultiTransactionAop{
@Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
publicvoidpointcut(){
if(log.isDebugEnabled()){
log.debug("startintransactionpointcut...");
}
}
@Around("pointcut()")
publicObjectaroundTransaction(ProceedingJoinPointpoint)throwsThrowable{
MethodSignaturesignature=(MethodSignature)point.getSignature();
//從切面中獲取當(dāng)前方法
Methodmethod=signature.getMethod();
MultiTransactionmultiTransaction=method.getAnnotation(MultiTransaction.class);
if(multiTransaction==null){
returnpoint.proceed();
}
IsolationLevelisolationLevel=multiTransaction.isolationLevel();
booleanreadOnly=multiTransaction.readOnly();
StringprevKey=DataSourceContextHolder.getKey();
MultiTransactionManagermultiTransactionManager=Application.resolve(multiTransaction.transactionManager());
//切數(shù)據(jù)源,如果失敗使用默認(rèn)庫
if(multiTransactionManager.switchDataSource(point,signature,multiTransaction))returnpoint.proceed();
//開啟事務(wù)棧
TransactionHoldertransactionHolder=multiTransactionManager.startTransaction(prevKey,isolationLevel,readOnly,multiTransactionManager);
Objectproceed;
try{
proceed=point.proceed();
multiTransactionManager.commit();
}catch(Throwableex){
log.error("executemethod:{}#{},err:",method.getDeclaringClass(),method.getName(),ex);
multiTransactionManager.rollback();
throwExceptionUtils.api(ex,"系統(tǒng)異常:%s",ex.getMessage());
}finally{
//當(dāng)前事務(wù)結(jié)束出棧
StringtransId=multiTransactionManager.getTrans().getExecuteStack().pop();
transactionHolder.getDatasourceKeyStack().pop();
//恢復(fù)上一層事務(wù)
DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
//最后回到主事務(wù),關(guān)閉此次事務(wù)
multiTransactionManager.close(transId);
}
returnproceed;
}
}
7.總結(jié)
本文主要介紹了多數(shù)據(jù)源管理的解決方案(應(yīng)用層事務(wù),而非XA二段提交保證),以及對(duì)多個(gè)庫同時(shí)操作的事務(wù)管理。
需要注意的是,這種方式只適用于單體架構(gòu)的應(yīng)用。因?yàn)槎鄠€(gè)庫的事務(wù)參與者都是運(yùn)行在同一個(gè)JVM進(jìn)行。如果是在微服務(wù)架構(gòu)的應(yīng)用中,則需要使用分布式事務(wù)管理(譬如:Seata)。
審核編輯 :李倩
-
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14361 -
數(shù)據(jù)源
+關(guān)注
關(guān)注
1文章
63瀏覽量
9701 -
微服務(wù)
+關(guān)注
關(guān)注
0文章
139瀏覽量
7370 -
SpringBoot
+關(guān)注
關(guān)注
0文章
174瀏覽量
187
原文標(biāo)題:SpringBoot 多數(shù)據(jù)源及事務(wù)解決方案
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論