組織將其大部分高速事務數據保存在快速的 NoSQL 數據存儲中,如 Apache Cassandra ?。最終,需要從這些數據中獲得分析見解 。從歷史上看,用戶利用外部大規模并行處理分析系統(如 Apache Spark )來實現這一目的。然而,今天的分析生態系統正在迅速采用 AI 和 ML 技術,這些技術的計算在很大程度上依賴于 GPU s 。
在這篇文章中,我們探索了一種處理 Cassandra SSTables 的尖端方法,方法是使用 RAPIDS 生態系統中的工具將它們直接解析到 GPU 設備內存中。這將使用戶能夠以更少的初始設置更快地到達見解 ,并且還可以方便地 MIG 評估用 Python 編寫的現有分析代碼。
在這篇分為兩部分的系列文章的第一篇文章中,我們將快速深入 RAPIDS 項目,并探索一系列選項,使來自卡桑德拉的數據可用于 RAPIDS 分析。最后,我們將描述我們當前的方法:解析 C ++中的 sRebug 文件并將它們轉換成 GPU 友好的格式,使數據更容易加載到 GPU 設備內存中。
如果您想跳過循序漸進的過程并立即嘗試 sstable to arrow ,請查看第二職位。
什么是 RAPIDS
RAPIDS是一套開源庫,用于在 GPU 上進行端到端的分析和數據科學。它源于CUDA,這是一個由 NVIDIA 開發的開發人員工具包,旨在使開發人員能夠利用其 GPU 的優勢。
RAPIDS 采用了常見的AI/ML API,如pandas和scikit-learn,并使它們可用于 GPU 加速。數據科學,特別是機器學習,使用了大量并行計算,這使得它更適合在 GPU 上運行,該 GPU 可以比當前的 CPU s(來自rapids.ai的圖像)高幾個數量級的“多任務”:
圖 1 :
一旦我們以 cuDF 的形式獲得 GPU 上的數據(本質上是 pandas 數據幀的 RAPIDS 等價物),我們就可以使用與 MIG 熟悉的 Python 庫幾乎相同的 API 與之交互,如 pandas 、 scikit learn 等,如下圖從 RAPIDS 所示:
圖 2 :
圖 3 :
注意使用Apache Arrow作為底層內存格式。箭頭是基于列而不是行的,這會導致更快的分析查詢。它還帶有進程間通信( IPC )機制,用于在進程之間傳輸箭頭記錄批(即表)。 IPC 格式與內存中的格式相同,它消除了任何額外的復制或反序列化成本,并為我們提供了一些非常快速的數據訪問。
在 RAPIDS 上運行分析的好處是顯而易見的。您所需要的只是合適的硬件,只需查找 Python 數據科學庫的名稱并將其替換為 GPU 等價物,即可 MIG 對 GPU 上運行的現有數據科學代碼進行評級。
我們如何將 Cassandra 數據放到 GPU 上?
在過去的幾周里,我一直在研究五種不同的方法,按復雜性增加的順序列出如下:
使用 Cassandra 驅動程序獲取數據,將其轉換為 pandas 數據幀,然后將其轉換為 cuDF 。
與前面相同,但跳過 pandas 步驟,將驅動程序中的數據直接轉換為箭頭表。
使用 Cassandra 服務器代碼從磁盤讀取 SSTables ,使用 Arrow IPC 流格式對其進行序列化,然后將其發送到客戶端。
與方法 3 相同,但是在 C ++中使用我們自己的解析實現,而不是使用 CasANDRA 代碼。
與方法 4 相同,但在解析 SSK 表時使用 GPU 矢量化和CUDA。
首先,我將簡要概述這些方法中的每一種,然后在最后進行比較,并解釋我們接下來的步驟。
使用 Cassandra 驅動程序獲取數據
這種方法非常簡單,因為您可以使用現有庫而不必進行太多的黑客攻擊。我們從驅動程序獲取數據,將session.row_factory設置為 pandas _factory函數,告訴驅動程序如何將傳入數據轉換為 pandas .DataFrame。然后,調用 cuDF .DataFrame.from_ZBK5]函數將數據加載到 GPU 上是一件簡單的事情,然后我們可以使用 RAPIDS 庫運行 GPU -加速分析。
以下代碼要求您能夠訪問正在運行的 Cassandra 群集。有關更多信息,請參閱DataStax Python 驅動程序文檔。您還需要使用Conda安裝所需的 Python 庫:
BashCopy
conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql cudf pyarrow pandas numpy cassandra-driver
PythonCopy
from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider import pandas as pd import pyarrow as pa import cudf from blazingsql import BlazingContext import config # connect to the Cassandra server in the cloud and configure the session settings cloud_config= { 'secure_connect_bundle': '/path/to/secure/connect/bundle.zip' } auth_provider = PlainTextAuthProvider(user=’your_username_here’, password=’your_password_here’) cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider) session = cluster.connect() def pandas_factory(colnames, rows): """Read the data returned by the driver into a pandas DataFrame""" return pd.DataFrame(rows, columns=colnames) session.row_factory = pandas_factory # run the CQL query and get the data result_set = session.execute("select * from your_keyspace.your_table_name limit 100;") df = result_set._current_rows # a pandas dataframe with the information gpu_df = cudf.DataFrame.from_pandas(df) # transform it into memory on the GPU # do GPU-accelerated operations, such as SQL queries with blazingsql bc = BlazingContext() bc.create_table("gpu_table", gpu_df) bc.describe_table("gpu_table") result = bc.sql("SELECT * FROM gpu_table") print(result)
使用 Cassandra 驅動程序直接將數據提取到 Arrow 中
此步驟與上一步相同,只是我們可以使用以下箭頭關閉 pandas \ u 工廠:
PythonCopy
def get_col(col): rtn = pa.array(col) # automatically detects the type of the array # for a full implementation, we would want to fully check which arrow types want # to be manually casted for compatibility with cudf if pa.types.is_decimal(rtn.type): return rtn.cast('float32') return rtn def arrow_factory(colnames, rows): # convert from the row format passed by # CQL into the column format of arrow cols = [get_col(col) for col in zip(*rows)] table = pa.table({ colnames[i]: cols[i] for i in range(len(colnames)) }) return table session.row_factory = arrow_factory
然后我們可以用同樣的方法獲取數據并創建 cuDF 。
然而,這兩種方法都有一個主要缺點:它們依賴于查詢現有的 Cassandra 集群,這是我們don’t所需要的,因為讀取量大的分析工作負載 MIG ht 會影響事務性生產工作負載,而實時性能是關鍵。
相反,我們想看看是否有一種方法可以直接從磁盤上的 SSTable 文件中獲取數據,而無需通過數據庫。這就引出了接下來的三種方法。
使用 Cassandra 服務器代碼從磁盤讀取 SSTables
在磁盤上讀取 SSTables 的最簡單方法可能是使用現有的 Cassandra 服務器技術,即SSTableLoader。一旦我們從 SSTable 中獲得了分區列表,我們就可以手動將 Java 對象中的數據轉換為對應于表列的箭頭向量。然后,我們可以將向量集合序列化為 Arrow IPC 流格式,然后通過套接字以這種格式進行流處理。
這里的代碼比前兩種方法更復雜,比下一種方法開發得更少,所以我沒有在本文中包含它。另一個缺點是,盡管這種方法可以在 Cassandra 集群以外的單獨進程或機器中運行,但要使用SSTableLoader,我們首先要在客戶端進程中初始化嵌入式 Cassandra ,這在冷啟動時需要相當長的時間。
使用自定義 SSTable 解析器
為了避免初始化 CasANDRA ,我們開發了自己的 C ++實現,用于解析二進制數據穩定文件。關于這種方法的更多信息可以在下一篇博文中找到。下面是 Cassandra 存儲引擎的指南上一篇 Pickle 的文章,這對破譯數據格式有很大幫助。我們決定使用 C ++作為解析器的語言,以最終引入 CUDA ,也可以用于處理二進制數據的低級控制。
集成 CUDA 以加快表讀取速度
一旦自定義解析實現變得更加全面,我們計劃開始研究這種方法。利用 GPU 矢量化可以大大加快讀取和轉換過程。
Comparison
在當前階段,我們主要關注讀取 SSTable 文件所需的時間。對于方法 1 和 2 ,我們實際上無法公平地衡量這一次,因為 1 )該方法依賴于額外的硬件( Cassandra 集群)和 2 )。在 Cassandra 自身中存在著復雜的緩存效應。然而,對于方法 3 和 4 ,我們可以執行簡單的自省來跟蹤程序從開始到結束讀取 SSTable 文件所需的時間。
以下是針對NoSQLBench生成的 1k 、 5K 、 10k 、 50k 、 100k 、 500k 和 1m 行數據集的結果:
圖 4 :
如圖所示,定制實現比現有的 Cassandra 實現稍快,即使沒有任何額外的優化,如多線程。
Conclusion
考慮到分析用例的數據訪問模式通常包括大型掃描并經常讀取整個表,獲取此數據的最有效方法不是通過 CQL ,而是直接獲取 SSL 表。我們能夠在 C ++中實現一個 StAnalyd 解析器,它可以做到這一點,并將數據轉換成 Apache 箭頭,以便它可以被分析庫所利用,包括 NVIDIA GPU 供電 RAPIDS 生態系統。由此產生的開源( Apache 2 許可)項目稱為 sstable to arrow ,可在GitHub上獲得,并可通過Docker Hub作為 alpha 版本訪問。
關于作者
Alex Cai 于 2021 年在 DataStax 實習,是哈佛大學 2025 級的學生。他熱衷于計算機、軟件和認知科學,在業余時間,他喜歡閱讀、研究語言學和玩他的貓。
審核編輯:郭婷
-
NVIDIA
+關注
關注
14文章
4986瀏覽量
103067 -
gpu
+關注
關注
28文章
4740瀏覽量
128951 -
C++
+關注
關注
22文章
2108瀏覽量
73651
發布評論請先 登錄
相關推薦
評論