原文:https://bit.ly/3nGQu4J
並行程式設計在歷史上一直是軟體開發中比較小眾和複雜的環節,往往不值得頭疼。但編寫並行化應用只會越來越簡單,一個應用同時利用裝置 CPU 上的多個核心,來實現效率最大化也是很常見的。
如今,隨著資料工程作為一個專業領域的興起,並行程式設計比以往任何時候都更受歡迎。Apache Spark 是一個用於Extract(提取), Transform(轉換) 和 Load(載入)——ETL 大型資料集的軟體庫,可能是當今最流行的並行程式設計的方式。雖然 Apache 的 Spark、Hadoop 和 AirFlow 是對資料工程師來說最常見的技術,但它們的使用要求精通的不是 C#,而是 Python、Scala 或 Java(儘管不是很理想)。
對於 ETL 工作來說,“正確”的工具應該是 Spark 或 Hadoop 這樣的工具。它們是專門為 ETL 設計的,與 C# 或其他語言相比,需要你編寫的程式碼更少。如果你想在資料工程這個領域深耕,你最好的選擇學習一下 Python 和 Spark(以及其他技術)。儘管如此,但有時正確的工具是你已經知道如何使用的工具。事實上,我已經發現 C# 和 .NET 可以勝任並行化 ETL 操作的任務。
多年來,C# 的發展使並行程式設計變得越來越簡單。因為 C# 與以前的版本保持 100% 的向後相容,所以很難知道在眾多並行執行程式碼的方法中哪種是最好的方法。事實上,在 .NET 中,有好幾種方法可以啟動多個執行緒,而真正的問題在於,你希望 .NET 在“背後”為你處理多少和你希望自己手動處理多少。
一般來說,我們確實希望 .NET 儘可能多的為我們處理,特別是線上程管理方面,因為並行執行程式碼是(程式設計時)非常複雜的,而且非常容易出現意外的執行時錯誤。事實上,微軟在這裡[2]有一篇專門的文件涵蓋了並行程式設計的潛在陷阱。
我建議大家讀一讀,但就 ETL 和其他“資料處理”任務而言,我們真正需要擔心的只有兩件事:並行化是否真的會更快,如果是,確保我們的程式碼是執行緒安全的。
CPU密集型 vs IO密集型在確定“並行化”程式碼是否值得時,重要的是要了解應用程式的哪些部分是 CPU 密集型(CPU-bound)而不是 IO 密集型(IO-bound)的。正如你可能已經猜到的那樣,並行化增強了 CPU 密集型程式碼的效能,它不僅不會對任何 IO 瓶頸(bottlenecks)產生改善,而且可能會加劇應用程式的 IO 瓶頸(相比之下,非同步程式設計的目的是減少 IO 密集)。
CPU 密集型的程式碼通常是對程式中的物件進行的運算或其他操作。解析 CSV 檔案、對映物件和計算平均值都依賴於 CPU。在處理資料時,效率來自於根據可用的 CPU 數量拆分資料集,基本上在每個 CPU 上同時執行程式--只是處理的是整個資料集不同的分組。
當代碼的執行依賴於透過“網線”傳送或接收資料時,即透過網際網路或內部網路連線到另一個伺服器時,程式碼就是 IO 密集型的。這種情況在呼叫 API 或資料庫的儲存庫方法中最為常見。如果程式碼向永續性儲存(如硬碟或固態硬碟)寫入或讀取,那麼它也可以是 IO 密集型的。如果我們使用的第三方 API 需要 10 秒才能將資料返回給我們的應用程式,我們也沒有什麼辦法,然而透過非同步程式設計,我們至少可以讓我們的程式在等待 API 呼叫的時候繼續使用 CPU 執行其他部分,而不是閒置。
在處理每個資料集都依賴於 IO 密集型呼叫的情況下,比如呼叫資料庫對處理後的資料進行 INSERT,那麼我們執行程式的 CPU 執行緒數與呼叫外部源(如 API 或資料庫)的次數之間的平衡就顯得非常重要。如果 API 有使用限制(例如每秒 10 個請求),或者資料庫伺服器沒有足夠的執行緒來處理我們的程式比如說 20 個執行緒都試圖同時向同一個資料庫 insert,那麼這一點就尤其重要。
在不使程式的 IO 密集型過載的情況下最大化效率和最小化程式執行時間,可能需要對程式在 ETL 過程的各個步驟中使用的執行緒數量進行一些試錯式的調整。
Parallel.ForEach vs PLINQ當使用 .NET 時,現在可以透過一個簡單的 Parallel.ForEach 迴圈或 Parallel LINQ[3](PLINQ)來實現並行化應用程式所需的一切。對於 .NET 來說,這些並不是特別新的東西,但與它們的前身相比,它們更容易使用,因為它們需要建立和管理執行緒,並手動分割集合。Parallel.ForEach 和 PLINQ 兩者都為我們處理了這一切,根據我的經驗,兩者之間的效能沒有明顯的差異。我猜測它們在底層呼叫的程式碼大致相同。
下面是分別使用 Parallel.ForEach 和 PLINQ 讀取 CSV 檔案的示例:
// PLINQ using all CPU corespublic static void PLINQAll(string filePath){ var sw = new Stopwatch(); sw.Start(); var results = System.IO.File.ReadAllLines(filePath) .AsParallel() .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) .ToList(); sw.Stop(); Console.WriteLine($"PLINQ using all cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}// PLINQ using all CPU threads (2x cores)public static void PLINQAllThreads(string filePath){ var threads = Environment.ProcessorCount * 2; var sw = new Stopwatch(); sw.Start(); var results = System.IO.File.ReadAllLines(filePath) .AsParallel() .WithDegreeOfParallelism(threads) .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) .ToList(); sw.Stop(); Console.WriteLine($"PLINQ using all THREADS {(threads)}: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}// PLINQ using 2 CPU corespublic static void PLINQ2(string filePath){ var sw = new Stopwatch(); sw.Start(); var results = System.IO.File.ReadAllLines(filePath) .AsParallel() .WithDegreeOfParallelism(2) .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) .ToList(); sw.Stop(); Console.WriteLine($"PLINQ using 2 cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}// PLINQ using user-defined thread countpublic static void PLINQUser(string filePath, int numThreads){ var sw = new Stopwatch(); sw.Start(); var results = System.IO.File.ReadAllLines(filePath) .AsParallel() .WithDegreeOfParallelism(numThreads) .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) .ToList(); sw.Stop(); Console.WriteLine($"PLINQ using {numThreads} threads: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}// PLINQ using all CPU cores without Regex parsing - May yield bad data due to commas within fieldspublic static void PLINQNoRegex(string filePath){ var sw = new Stopwatch(); sw.Start(); var results = System.IO.File.ReadAllLines(filePath) .AsParallel() .Select(x => x.Split(',')) .ToList(); sw.Stop(); Console.WriteLine($"PLINQ using all cores (no Regex): completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}// Parallel.ForEach using all CPU cores - May yield bad datapublic static void ParallelForEach(string filePath){ var sw = new Stopwatch(); sw.Start(); var rows = new List<string[]>(); Parallel.ForEach(File.ReadLines(filePath), line => { rows.Add(line.Split(',')); }); sw.Stop(); Console.WriteLine($"Parallel.ForEach using all cores (no Regex): completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}// Parallel.ForEach using all CPU cores with Regex parsing - takes roughly the same amount of time as PLINQpublic static void ParallelForEachRegex(string filePath){ var sw = new Stopwatch(); sw.Start(); var rows = new List<string[]>(); Parallel.ForEach(File.ReadLines(filePath), line => { rows.Add(Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")); }); sw.Stop(); Console.WriteLine($"Parallel.ForEach w/Regex using all cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");}
那麼應該選擇哪一種方式呢?我建議根據你在非並行情況下使用哪種方法來決定--你會使用 foreach 迴圈還是 LINQ 查詢?也就是說,這兩者之間最大的、不明顯的區別是 Parallel.ForEach 允許你指定執行緒數,最多不超過計算機或伺服器上可用的執行緒數(你可以指定更多的執行緒數,但它最多隻能啟動 CPU 所擁有的執行緒數)。然而,根據你的指定,PLINQ 可以使用超過計算機上 CPU 執行緒數的執行緒(譯註:不是所有的執行緒都是工作中的,所以理論上可以建立無數個執行緒)。一般情況下,你不會希望啟動超出比 CPU 執行緒更多的執行緒,但在某些情況下,這樣做會更有效。例如,如果你要寫一個網路爬蟲,那麼啟動雙倍的執行緒數量可能是有意義的,因為每個執行緒大概都要等待網站的載入。其他這樣的網路密集型(IO 密集型的一個子集)任務可能會在更多執行緒的情況下執行得更快。
示例程式我們的 ELT 示例程式(倉庫地址[5])將做以下四件事件:
讀取 CSV 檔案;將這些 CSV 檔案中的欄位對映到 C# 物件;對資料(物件列表)執行一些轉換操作;將這些資料插入到資料庫中。我將在接下來即將釋出的第二篇文章中繼續介紹該示例。
文中連結:
[1]. https://dotnet.microsoft.com/apps/data/spark
[2]. https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism
[3]. https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq
[4]. https://devblogs.microsoft.com/pfxteam/when-to-use-parallel-foreach-and-when-to-use-plinq/
[5]. https://gitlab.com/jspinella/parallel-etl-examples
-
帶你洞悉程式設計與架構
點頭像關注,不錯過網海相遇的緣分[比心]