1、程序簡介
該程序是基于OpenHarmony的C++公共基礎類庫的讀寫鎖:SafeBlockQueue。
線程安全阻塞隊列SafeBlockQueue類,提供阻塞和非阻塞版的入隊入隊和出隊接口,并提供可最追蹤任務完成狀態的的SafeBlockQueueTracking類。
本案例主要完成如下工作:
(1)使用SafeBlockQueue接口的案例
判斷命令行是否使用阻塞,還是非阻塞;
創建子線程生產者,使用阻塞/非阻塞方式,入隊操作;
創建子線程消費者,使用阻塞/非阻塞方式,出隊操作;
主線程等待所有子線程結束
(2)使用SafeBlockQueueTracking接口的案例
判斷命令行是否使用阻塞,還是非阻塞;
創建子線程生產者,使用阻塞/非阻塞方式,入隊操作;
創建子線程消費者,使用阻塞/非阻塞方式,出隊操作;
主線程等待所有子線程結束
該案例已在凌蒙派-RK3568開發板驗證過,如需要完整源代碼,請參考:
https://gitee.com/Lockzhiner-Electronics/lockzhiner-rk3568-openharmony/tree/master/samples/a29_utils_safeblockqueue
2、基礎知識
C++公共基礎類庫為標準系統提供了一些常用的C++開發工具類,包括:
文件、路徑、字符串相關操作的能力增強接口
安全數據容器、數據序列化等接口
各子系統的錯誤碼相關定義
2.1、添加C++公共基礎類庫依賴
修改需調用模塊的BUILD.gn,在external_deps或deps中添加如下:
ohos_shared_library("xxxxx") { ... external_deps = [ ... # 動態庫依賴(可選) "c_utils:utils", # 靜態庫依賴(可選) "c_utils:utilsbase", # Rust動態庫依賴(可選) "c_utils:utils_rust", ] ...}
一般而言,我們只需要填寫"c_utils:utils"即可。
2.2、SafeBlockQueue頭文件
C++公共基礎類庫的SafeBlockQueue頭文件在://commonlibrary/c_utils/base/include/safe_block_queue.h
可在源代碼中添加如下:
#include
2.3、OHOS::SafeBlockQueue接口說明
2.3.1、SafeBlockQueue
構造函數。
SafeBlockQueue(int capacity)
參數說明:
參數名稱 | 類型 | 參數說明 |
---|---|---|
capacity | int | SafeBlockQueue的容量,即能存儲多少個單元 |
2.3.2、~SafeBlockQueue
析構函數。
~SafeBlockQueue();
2.3.3、Push
入隊操作(阻塞版)。
void virtual Push(T const& elem);
2.3.4、PushNoWait
入隊操作(非阻塞版)。
bool virtual PushNoWait(T const& elem);
返回值說明:
類型 | 返回值說明 |
---|---|
bool | true表示成功,false表示失敗 |
2.3.5、Pop
出隊操作(阻塞版)。
T Pop();
返回值說明:
類型 | 返回值說明 |
---|---|
T | 出隊的單元 |
2.3.6、PopNotWait
出隊操作(非阻塞版)。
bool PopNotWait(T& outtask);
參數說明:
參數名稱 | 類型 | 參數說明 |
---|---|---|
outtask | T | 出隊的單元 |
返回值說明:
類型 | 返回值說明 |
---|---|
bool | true表示成功,false表示失敗 |
2.3.7、Size
獲取隊列容量。
unsigned int Size();
返回值說明:
類型 | 返回值說明 |
---|---|
unsigned int | 返回隊列的容量 |
2.3.8、IsEmpty
隊列判空。
bool IsEmpty;
返回值說明:
類型 | 返回值說明 |
---|---|
bool | true表示成功,false表示失敗 |
2.3.9、IsFull
判斷map是否為滿。
bool IsFull();
返回值說明:
類型 | 返回值說明 |
---|---|
bool | true表示空,false表示非空 |
2.4、OHOS::SafeBlockQueueTracking接口說明
2.4.1、SafeBlockQueue
構造函數。
SafeBlockQueue(int capacity)
參數說明:
參數名稱 | 類型 | 參數說明 |
---|---|---|
capacity | int | SafeBlockQueue的容量,即能存儲多少個單元 |
2.4.2、~SafeBlockQueue
析構函數。
~SafeBlockQueue();
2.4.3、Push
入隊操作(阻塞版)。
void virtual Push(T const& elem);
2.4.4、PushNoWait
入隊操作(非阻塞版)。
bool virtual PushNoWait(T const& elem);
返回值說明:
類型 | 返回值說明 |
---|---|
bool | true表示成功,false表示失敗 |
3、程序解析
3.1、創建編譯引導
在上一級目錄BUILD.gn文件添加一行編譯引導語句。
import("http://build/ohos.gni")
group("samples") { deps = [ "a29_utils_safeblockqueue:utils_safeblockqueue", # 添加該行 ]}
"a29_utils_safeblockqueue:utils_safeblockqueue",該行語句表示引入 參與編譯。
3.2、創建編譯項目
創建a29_utils_safeblockqueue目錄,并添加如下文件:
a29_utils_safeblockqueue├── utils_safeblockqueue_sample.cpp # .cpp源代碼├── utils_safeblockqueuetracking_sample.cpp # .cpp源代碼├── BUILD.gn # GN文件
3.3、創建BUILD.gn
編輯BUILD.gn文件。
import("http://build/ohos.gni")
ohos_executable("utils_safeblockqueue_test") { sources = [ "utils_safeblockqueue_sample.cpp" ] include_dirs = [ "http://commonlibrary/c_utils/base/include", "http://commonlibrary/c_utils/base:utils", "http://third_party/googletest:gtest_main", "http://third_party/googletest/googletest/include" ] external_deps = [ "c_utils:utils" ] part_name = "product_rk3568" install_enable = true}
ohos_executable("utils_safeblockqueue_tracking") { sources = [ "utils_safeblockqueuetracking_sample.cpp" ] include_dirs = [ "http://commonlibrary/c_utils/base/include", "http://commonlibrary/c_utils/base:utils", "http://third_party/googletest:gtest_main", "http://third_party/googletest/googletest/include" ] external_deps = [ "c_utils:utils" ] part_name = "product_rk3568" install_enable = true}
group("utils_safeblockqueue") { deps = [ ":utils_safeblockqueue_test", # 構建SafeBlockQueue案例 ":utils_safeblockqueue_tracking", # 構建SafeBlockQueueTracking案例 ]}
注意:
(1)BUILD.gn中所有的TAB鍵必須轉化為空格,否則會報錯。如果自己不知道如何規范化,可以:
# 安裝gn工具sudo apt-get install ninja-buildsudo apt install generate-ninja# 規范化BUILD.gngn format BUILD.gn
3.4、創建SafeBlockQueue案例源代碼
3.4.1、創建SafeBlockQueue變量
#include // SafeBlockQueue的頭文件
// 定義常量const int SIZE = 5;// 定義SafeBlockQueue變量OHOS::SafeBlockQueue
3.4.2、獲知運行阻塞/非阻塞方式
命令有1個參數,分別是:
wait:使用阻塞方式的SafeBlockQueue;
nowait:使用非阻塞方式的SafeBlockQueue;
int main(int argc, char **argv){ bool enable_wait = true; ...... // 獲取命令行參數 if (argc != 2) { cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl; return -1; } if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) { enable_wait = true; } else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) { enable_wait = false; } else { cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl; return -1; }}
3.4.3、創建線程池并設置子線程數量
int main(int argc, char **argv){ OHOS::ThreadPool threads("threads"); string str_name; ...... threads.SetMaxTaskNum(4); threads.Start(2); ......}
3.4.4、啟動2個子線程,并等待結束
調用AddTask()添加子線程,并調用Stop()等待所有子進程結束。
// 創建生產者線程cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));threads.AddTask(task_product);
// 等待SIZE秒,將SafeBlockQueue容器填滿cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;std::sleep_for(std::milliseconds(1000 * SIZE));
// 創建消費者線程cout << get_curtime() << ", " << __func__ << ": consume start" << endl; auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));threads.AddTask(task_consumer);
threads.Stop();cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl;
3.4.5、子線程生產者,使用阻塞方式
static void product_wait(const string &name){ for (int i = 0; i < (2 * SIZE); i++) { cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl; // 使用阻塞方式的SafeBlockQueue m_safeBlockQueue.Push(i); cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl; // 等待1秒 cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl; std::sleep_for(std::milliseconds(1000)); }}
3.4.6、子線程消費者,使用阻塞方式
static void consume_wait(const string &name){ for (int i = 0; i < (2 * SIZE); i++) { cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl; // 使用阻塞方式的SafeBlockQueue int value = m_safeBlockQueue.Pop(); cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl; // 等待0.5秒 cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl; std::sleep_for(std::milliseconds(500)); }}
3.4.7、子線程生產者,使用非阻塞方式
static void product_nowait(const string &name){ bool ret; for (int i = 0; i < (2 * SIZE); i++) { cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl; // 使用非阻塞方式的SafeBlockQueue ret = m_safeBlockQueue.PushNoWait(i); cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl; // 等待1秒 cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl; std::sleep_for(std::milliseconds(1000)); }}
3.4.8、子線程消費者,使用非阻塞方式
static void consume_nowait(const string &name){ for (int i = 0; i < (SIZE * 2); i++) { // 等待有新數據 int value = 0; // 使用非阻塞方式的SafeBlockQueue bool ret = m_safeBlockQueue.PopNotWait(value); cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl; // 等待500毫秒 std::sleep_for(std::milliseconds(500)); }}
3.5、創建SafeBlockQueueTracking案例源代碼
3.5.1、創建SafeBlockQueueTracking變量
#include // SafeBlockQueue的頭文件
// 定義常量const int SIZE = 5;// 定義SafeBlockQueue變量OHOS::SafeBlockQueueTracking
3.5.2、獲知運行阻塞/非阻塞方式
命令有1個參數,分別是:
wait:使用阻塞方式的SafeBlockQueue;
nowait:使用非阻塞方式的SafeBlockQueue;
int main(int argc, char **argv){ bool enable_wait = true; ...... // 獲取命令行參數 if (argc != 2) { cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl; return -1; } if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) { enable_wait = true; } else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) { enable_wait = false; } else { cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl; return -1; }}
3.5.3、創建線程池并設置子線程數量
int main(int argc, char **argv){ OHOS::ThreadPool threads("threads"); string str_name; ...... threads.SetMaxTaskNum(4); threads.Start(2); ......}
3.5.4、啟動2個子線程,并等待結束
調用AddTask()添加子線程,并調用Stop()等待所有子進程結束。
// 創建生產者線程cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));threads.AddTask(task_product);
// 等待SIZE秒,將SafeBlockQueue容器填滿cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;std::sleep_for(std::milliseconds(1000 * SIZE));
// 創建消費者線程cout << get_curtime() << ", " << __func__ << ": consume start" << endl; auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));threads.AddTask(task_consumer);
threads.Stop();cout<
3.5.5、子線程生產者,使用阻塞方式
static void product_wait(const string &name){ for (int i = 0; i < (2 * SIZE); i++) { cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl; // 使用阻塞方式的SafeBlockQueueTracking m_safeBlockQueueTracking.Push(i); cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl; // 等待1秒 cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl; std::sleep_for(std::milliseconds(1000)); }}
3.5.6、子線程消費者,使用阻塞方式
static void consume_wait(const string &name){ for (int i = 0; i < (2 * SIZE); i++) { cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl; // 使用阻塞方式的SafeBlockQueueTracking int value = m_safeBlockQueueTracking.Pop(); cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl; m_safeBlockQueueTracking.OneTaskDone(); cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl; // 等待0.5秒 cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl; std::sleep_for(std::milliseconds(500)); }
}
3.5.7、子線程生產者,使用非阻塞方式
static void product_nowait(const string &name){ bool ret; for (int i = 0; i < (2 * SIZE); i++) { cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl; // 使用非阻塞方式的SafeBlockQueueTracking ret = m_safeBlockQueueTracking.PushNoWait(i); cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl; if (ret == true) { m_safeBlockQueueTracking.OneTaskDone(); cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl; } // 等待1秒 cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl; std::sleep_for(std::milliseconds(1000)); }}
3.5.8、子線程消費者,使用非阻塞方式
static void consume_nowait(const string &name){ for (int i = 0; i < (SIZE * 2); i++) { // 等待有新數據 int value = 0; // 使用非阻塞方式的SafeBlockQueueTracking bool ret = m_safeBlockQueueTracking.PopNotWait(value); cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl; // 等待500毫秒 std::sleep_for(std::milliseconds(500)); }
}
4、編譯步驟
進入OpenHarmony編譯環境,運行命令:
hb build -f
5、運行結果
#
-
Queue
+關注
關注
0文章
16瀏覽量
7261 -
系統
+關注
關注
1文章
1017瀏覽量
21339 -
OpenHarmony
+關注
關注
25文章
3722瀏覽量
16313
發布評論請先 登錄
相關推薦
評論