通信底層介紹
xxl-job 使用 netty http 的方式進行通信,雖然也支持 Mina,jetty,netty tcp 等方式,但是代碼里面固定寫死的是 netty http。
通信整體流程
我以調度器通知執行器執行任務為例,繪制的活動圖:
活動圖
驚艷的設計
看完了整個處理流程代碼,設計上可以說獨具匠心,將 netty,多線程的知識運用得行云流水。
我現在就將這些設計上出彩的點總結如下:
使用動態代理模式,隱藏通信細節
xxl-job 定義了兩個接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封裝了向心跳,暫停,觸發執行等操作,AdminBiz 封裝了回調,注冊,取消注冊操作,接口的實現類中,并沒有通信相關的處理。
XxlRpcReferenceBean 類的 getObject() 方法會生成一個代理類,這個代理類會進行遠程通信。
全異步處理
執行器收到消息進行反序列化,并沒有同步執行任務代碼,而是將任務信息存儲在 LinkedBlockingQueue 中,異步線程從這個隊列中獲取任務信息,然后執行。
而任務的處理結果,也不是說處理完之后,同步返回的,也是放到回調線程的阻塞隊列中,異步的將處理結果返回回去。
這樣處理的好處就是減少了 netty 工作線程的處理時間,提升了吞吐量。
對異步處理的包裝
對異步處理進行了包裝,代碼看起來是同步調用的。
我們看下調度器,XxlJobTrigger 類觸發任務執行的代碼:
publicstaticReturnTrunExecutor(TriggerParamtriggerParam,Stringaddress){ ReturnT runResult=null; try{ ExecutorBizexecutorBiz=XxlJobScheduler.getExecutorBiz(address); //這里面做了很多異步處理,最終同步得到處理結果 runResult=executorBiz.run(triggerParam); }catch(Exceptione){ logger.error(">>>>>>>>>>>xxl-jobtriggererror,pleasecheckiftheexecutor[{}]isrunning.",address,e); runResult=newReturnT (ReturnT.FAIL_CODE,ThrowableUtil.toString(e)); } StringBufferrunResultSB=newStringBuffer(I18nUtil.getString("jobconf_trigger_run")+":"); runResultSB.append("
address:").append(address); runResultSB.append("
code:").append(runResult.getCode()); runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); returnrunResult; }
ExecutorBiz.run 方法我們說過了,是走的動態代理,和執行器進行通信,執行器執行結果也是異步處理完,才返回的,而這里看到的 run 方法是同步等待處理結果返回。
我們看下xxl-job是如何同步獲取處理結果的:調度器向執行器發出消息后,該線程阻塞。等到執行器處理完畢后,將處理結果返回,喚醒被阻塞的線程,調用處拿到返回值。
動態代理代碼如下:
//代理類中的觸發調用 if(CallType.SYNC==callType){ //future-responseset XxlRpcFutureResponsefutureResponse=newXxlRpcFutureResponse(invokerFactory,xxlRpcRequest,null); try{ //doinvoke client.asyncSend(finalAddress,xxlRpcRequest); //futureget XxlRpcResponsexxlRpcResponse=futureResponse.get(timeout,TimeUnit.MILLISECONDS); if(xxlRpcResponse.getErrorMsg()!=null){ thrownewXxlRpcException(xxlRpcResponse.getErrorMsg()); } returnxxlRpcResponse.getResult(); }catch(Exceptione){ logger.info(">>>>>>>>>>>xxl-rpc,invokeerror,address:{},XxlRpcRequest{}",finalAddress,xxlRpcRequest); throw(einstanceofXxlRpcException)?e:newXxlRpcException(e); }finally{ //future-responseremove futureResponse.removeInvokerFuture(); } }
XxlRpcFutureResponse 類中實現了線程的等待,和線程喚醒的處理:
//返回結果,喚醒線程 publicvoidsetResponse(XxlRpcResponseresponse){ this.response=response; synchronized(lock){ done=true; lock.notifyAll(); } } @Override publicXxlRpcResponseget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{ if(!done){ synchronized(lock){ try{ if(timeout0)?{ ????????????//線程阻塞 ????????????????????????lock.wait(); ????????????????????}?else?{ ????????????????????????long?timeoutMillis?=?(TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout?,?unit); ????????????????????????lock.wait(timeoutMillis); ????????????????????} ????????????????}?catch?(InterruptedException?e)?{ ????????????????????throw?e; ????????????????} ????????????} ????????} ????????if?(!done)?{ ????????????throw?new?XxlRpcException("xxl-rpc,?request?timeout?at:"+?System.currentTimeMillis()?+",?request:"?+?request.toString()); ????????} ????????return?response; ????}
有的同學可能會問了,調度器接收到返回結果,怎么確定喚醒哪個線程呢?
每一次遠程調用,都會生成 uuid 的請求 id,這個 id 是在整個調用過程中一直傳遞的,就像一把鑰匙,在你回家的的時候,拿著它就帶開門。
這里拿著請求 id 這把鑰匙,就能找到對應的 XxlRpcFutureResponse,然后調用 setResponse 方法,設置返回值,喚醒線程。
publicvoidnotifyInvokerFuture(StringrequestId,finalXxlRpcResponsexxlRpcResponse){
//通過requestId找到XxlRpcFutureResponse, finalXxlRpcFutureResponsefutureResponse=futureResponsePool.get(requestId); if(futureResponse==null){ return; } if(futureResponse.getInvokeCallback()!=null){ //callbacktype try{ executeResponseCallback(newRunnable(){ @Override publicvoidrun(){ if(xxlRpcResponse.getErrorMsg()!=null){ futureResponse.getInvokeCallback().onFailure(newXxlRpcException(xxlRpcResponse.getErrorMsg())); }else{ futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult()); } } }); }catch(Exceptione){ logger.error(e.getMessage(),e); } }else{ //里面調用lock的notify方法 futureResponse.setResponse(xxlRpcResponse); } //doremove futureResponsePool.remove(requestId); } 審核編輯:黃飛
-
通信技術
+關注
關注
20文章
1135瀏覽量
92288 -
線程
+關注
關注
0文章
505瀏覽量
19712 -
調度器
+關注
關注
0文章
98瀏覽量
5261
原文標題:xxl-job驚艷的設計,怎能叫人不愛
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論