在同步的 Rust 方法中調用異步代碼經常會導致一些問題,特別是對于不熟悉異步 Rust runtime 底層原理的初學者。在本文中,我們將討論我們遇到的一個特殊問題,并分享我們采取的解決方法的經驗。
背景和問題
在做GreptimeDB項目的時候,我們遇到一個關于在同步 Rust 方法中調用異步代碼的問題。經過一系列故障排查后,我們弄清了問題的原委,這大大加深了對異步 Rust 的理解,因此在這篇文章中分享給大家,希望能給被相似問題困擾的 Rust 開發者一些啟發。
我們的整個項目是基于Tokio這個異步 Rust runtime 的,它將協作式的任務運行和調度方便地封裝在.await調用中,非常簡潔優雅。但是這樣也讓不熟悉 Tokio 底層原理的用戶一不小心就掉入到坑里。
我們遇到的問題是,需要在一個第三方庫的 trait 實現中執行一些異步代碼,而這個 trait 是同步的,我們無法修改這個 trait 的定義。
traitSequencer{ fngenerate(&self)->Vec; }
我們用一個PlainSequencer來實現這個 trait ,而在實現generate方法的時候依賴一些異步的調用(比如這里的PlainSequencer::generate_async):
implPlainSequencer{ asyncfngenerate_async(&self)->Vec{ letmutres=vec![]; foriin0..self.bound{ res.push(i); tokio::sleep(Duration::from_millis(100)).await; } res } } implSequencerforPlainSequencer{ fngenerate(&self)->Vec { self.generate_async().await } }
這樣就會出現問題,因為generate是一個同步方法,里面是不能直接 await 的。
error[E0728]:`await`isonlyallowedinside`async`functionsandblocks -->src/common/tt.rs30 | 31|/fngenerate(&self)->Vec{ 32||self.generate_async().await ||^^^^^^onlyallowedinside`async`functionsandblocks 33||} ||_____-thisisnot`async`
我們首先想到的是,Tokio 的 runtime 有一個Runtime::block_on方法,可以同步地等待一個 future 完成。
implSequencerforPlainSequencer{ fngenerate(&self)->Vec{ RUNTIME.block_on(async{ self.generate_async().await }) } } #[cfg(test)] modtests{ #[tokio::test] asyncfntest_sync_method(){ letsequencer=PlainSequencer{ bound:3 }; letvec=sequencer.generate(); println!("vec:{:?}",vec); } }
編譯可以通過,但是運行時直接報錯:
Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks. thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9
提示不能從一個執行中的 runtime 直接啟動另一個異步 runtime??磥?Tokio 為了避免這種情況特地在Runtime::block_on入口做了檢查。既然不行那我們就再看看其他的異步庫是否有類似的異步轉同步的方法。
果然找到一個futures::block_on。
implSequencerforPlainSequencer{ fngenerate(&self)->Vec{ futures::block_on(async{ self.generate_async().await }) } }
編譯同樣沒問題,但是運行時代碼直接直接 hang 住不返回了。
cargotest--color=always--packagetokio-demo
--bintttests::test_sync_method
--no-fail-fast----format=json
--exact-Zunstable-options--show-output
Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(
明明generate_async方法里面只有一個簡單的sleep()調用,但是為什么 future 一直沒完成呢?
并且吊詭的是,同樣的代碼,在tokio::test里面會 hang 住,但是在tokio::main中則可以正常執行完畢:
#[tokio::main] pubasyncfnmain(){ letsequencer=PlainSequencer{ bound:3 }; letvec=sequencer.generate(); println!("vec:{:?}",vec); }
執行結果:
cargorun--color=always--packagetokio-demo--bintt Finisheddev[unoptimized+debuginfo]target(s)in0.05s Running`target/debug/tt` vec:[0,1,2]
其實當初真正遇到這個問題的時候定位到具體在哪里 hang 住并沒有那么容易。真實代碼中 async 執行的是一個遠程的 gRPC 調用,當初懷疑過是否是 gRPC server 的問題,動用了網絡抓包等等手段最終發現是 client 側的問題。
這也提醒了我們在出現 bug 的時候,抽象出問題代碼的執行模式并且做出一個最小可復現的樣例(Minimal Reproducible Example)是非常重要的。
Catchup
在 Rust 中,一個異步的代碼塊會被make_async_expr編譯為一個實現了std::Future的 generator。
#[tokio::test] asyncfntest_future(){ letfuture=async{ println!("hello"); }; //theaboveasyncblockwon'tgetexecuteduntilweawaitit. future.await; }
而.await本質上是一個語法糖,則會被lower_expr_await編譯成類似于下面的一個語法結構:
//pseudo-rustcode match::into_future(){ mut__awaitee=>loop{ matchunsafe{::poll( <::Pin>::new_unchecked(&mut__awaitee), ::get_context(task_context), )}{ ::Ready(result)=>breakresult, ::Pending=>{} } task_context=yield(); } }
在上面這個去掉了語法糖的偽代碼中,可以看到有一個循環不停地檢查 generator 的狀態是否為已完成(std::poll)。
自然地,必然存在一個組件來做這件事,這里就是 Tokio 和async-std這類異步運行時發揮作用的地方了。Rust 在設計之初就特意將異步的語法(async/await)和異步運行時的實現分開,在上述的示例代碼中,poll 的操作是由 Tokio 的 executor 執行的。
問題分析
回顧完背景知識,我們再看一眼方法的實現:
fngenerate(&self)->Vec{ futures::block_on(async{ self.generate_async().await }) }
調用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await這個 future 又是誰在 poll 呢?
一開始我以為,futures::block_on會有一個內部的 runtime 去負責generate_async的 poll。于是查看了代碼(主要是futures_executor::run_executor這個方法):
fnrun_executor)->Poll >(mutf:F)->T{ let_enter=enter().expect( "cannotexecute`LocalPool`executorfromwithin anotherexecutor", ); CURRENT_THREAD_NOTIFY.with(|thread_notify|{ letwaker=waker_ref(thread_notify); letmutcx=Context::from_waker(&waker); loop{ ifletPoll::Ready(t)=f(&mutcx){ returnt; } letunparked=thread_notify.unparked.swap(false,Ordering::Acquire); if!unparked{ thread::park(); thread_notify.unparked.store(false,Ordering::Release); } } }) }
立刻嗅到了一絲不對的味道,雖然這個方法名為run_executor,但是整個方法里面貌似沒有任何 spawn 的操作,只是在當前線程不停的循環判斷用戶提交的 future 的狀態是否為 ready 啊!
這意味著,當 Tokio 的 runtime 線程執行到這里的時候,會立刻進入一個循環,在循環中不停地判斷用戶的的 future 是否 ready。如果還是 pending 狀態,則將當前線程 park 住。
假設,用戶 future 的異步任務也是交給了當前線程去執行,futures::block_on等待用戶的 future ready,而用戶 future 等待futures::block_on釋放當前的線程資源,那么不就死鎖了?
這個推論聽起來很有道理,讓我們來驗證一下。既然不能在當前 runtime 線程 block,那就重新開一個 runtime block:
implSequencerforPlainSequencer{ fngenerate(&self)->Vec{ letbound=self.bound; futures::block_on(asyncmove{ RUNTIME.spawn(asyncmove{ letmutres=vec![]; foriin0..bound{ res.push(i); tokio::sleep(Duration::from_millis(100)).await; } res }).await.unwrap() }) } }
果然可以了。
cargotest--color=always--packagetokio-demo
--bintttests::test_sync_method
--no-fail-fast----format=json
--exact-Zunstable-options--show-output
Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]
值得注意的是,在futures::block_on里面,額外使用了一個RUNTIME來 spawn 我們的異步代碼。其原因還是剛剛所說的,這個異步任務需要一個 runtime 來驅動狀態的變化。
如果我們刪除 RUNTIME,而為 futures::block_on 生成一個新的線程,雖然死鎖問題得到了解決,但tokio::sleep 方法的調用會報錯"no reactor is running",這是因為 Tokio 的功能運作需要一個 runtime:
called`Result::unwrap()`onan`Err`value:Any{..} thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime', ...
tokio::main和tokio::test
在分析完上面的原因之后,“為什么tokio::main中不會 hang 住而tokio::test會 hang ?。俊?這個問題也很清楚了,他們兩者所使用的的 runtime 并不一樣。tokio::main使用的是多線程的 runtime,而tokio::test使用的是單線程的 runtime,而在單線程的 runtime 下,當前線程被futures::block_on卡死,那么用戶提交的異步代碼是一定沒機會執行的,從而必然形成上面所說的死鎖。
Best practice
經過上面的分析,結合 Rust 基于 generator 的協作式異步特性,我們可以總結出 Rust 下橋接異步代碼和同步代碼的一些注意事項:
?將異步代碼與同步代碼結合使用可能會導致阻塞,因此不是一個明智的選擇。
?在同步的上下文中調用異步代碼時,請使用 futures::block_on 并將異步代碼 spawn 到另一個專用的 runtime 中執行 ,因為前者會阻塞當前線程。
?如果必須從異步的上下文中調用有可能阻塞的同步代碼(比如文件 IO 等),則建議使用 tokio::spawn_blocking 在專門處理阻塞操作的 executor 上執行相應的代碼。
審核編輯:劉清
-
RPC
+關注
關注
0文章
111瀏覽量
11540 -
Asynchrono
+關注
關注
0文章
4瀏覽量
6528 -
Rust
+關注
關注
1文章
229瀏覽量
6621
原文標題:如何在同步的 Rust 方法中調用異步代碼 | Tokio 使用中的幾點教訓
文章出處:【微信號:Rust語言中文社區,微信公眾號:Rust語言中文社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論