以下是「向現有 DataFrame 添加雜湊」的任務如何從花費幾天時間到消耗幾乎整個衝刺的過程。
2022 年第二季度,我開始開發一個資料管道,該管道從 REST 服務獲取市場資料並將其儲存在 BigQuery 表中。這是管道的高級解釋。有趣的部分是如何查詢數據,將其轉換為 DataFrame,然後使用 AirFlow 的GCSToBigQueryOperator上傳 BigQuery 表。
最初,看起來寫起來很簡單,但 Airflow 的「冪等」原理給它增加了一些挑戰。從該 REST 服務獲取的內容由另一個表決定,即使 JOB 是冪等的,它用作參考的表也可能在兩次運行之間發生變化。經過額外的時間,與資料工程師的溝通管道已於 2022 年第三季末準備就緒。
快轉到 2024 年第一季。此時,我們有更多用戶存取數據,我們意識到我們的查詢模式沒有正確使用分區。或者更確切地說,我們想要基於字串列存取數據,但無法在 BigQuery 中對字串列進行分區。這導致掃描大量數據並經常達到每日配額。
這促使我們考慮如何根據字串列對資料進行分區。我們的資料工程師建議使用 FarmHash 並附加類比運算將該字串列轉換為整數。在概念驗證中,這減少了近 90% 的掃描,查詢效能提高了 3-5 倍。我們決定繼續將此作為最終解決方案。我們所需要的只是:
為了在 Python 中計算 FarmHash 指紋,有一個 pyfarmhash 模組。我安裝了該模組並使用下面的程式碼來計算哈希值,並且在本地一切都按預期工作。
def get_hash(val: str) -> int: return additonal_logic(pyfarmhash.fingerprint64(...)) df[‘hash’] = df[‘Col’].apply(get_hash)
隨著所有測試的通過,現在是時候將程式碼推送到 Airflow 並運行它了。我沒想到在這個階段會出現任何問題。事實上,我很高興一切都按計劃並在預計的時間內進行。
懷著愉快的心情和充滿信心,我推動了我的改變,開始了工作,然後等待了 10-15 分鐘讓它完成。同時,我切換到另一項任務。很快,我收到了一封來自 Airflow 的意外失敗電子郵件。我查看了日誌,並驚訝地發現安裝 pyfarmhash 模組時失敗了!
為了幫助您理解問題,我需要解釋一下作業的結構。作業有以下步驟:
在這個過程中,下載資料的task-1是一個單獨的Python模組。為了運行它,我使用了 Airflow 中的PythonVirtualenvOperator。此操作符可讓您根據需要指定套件,然後將它們安裝在新建立的虛擬環境中。安裝套件後,它的所有依賴項也會安裝,您就可以開始使用了。
我加入了pyfarmhash作為下載資料的模組的依賴項,其他一切保持不變。但它失敗了!為什麼?
pyfarmhash是一個用 C/C++ 實作的雜湊函式庫。安裝後,它需要 GCC 來編譯軟體包,而 Airflow 主機上不存在該軟體包。在 Airflow 主機上不安裝 GCC 是有道理的,但不幸的是,這對我來說是一個障礙。
我尋找了 pyfarmhash 套件的純 Python 實現,但沒有。然後,我尋找車輪包裝,但同樣沒有。我考慮過建造輪子包並推動它們,但這將導致在內部提供輪子包的長期責任。我想避免額外的、類似解決方法的步驟。我探索了所有選項,並與維護 Airflow 的團隊進行了討論。他們建議建立一個 Docker 映像並在KubernetesPodOperator中運行它。這是一個不錯的選擇,因為我可以控制環境並包含所需的任何內容,而無需依賴外部環境。此外,該解決方案沒有解決方法。唯一的短期缺點是需要更多時間來實施。
在開始使用基於 Docker 的解決方案之前,我已經在這項任務上花費了大約 16-20 個小時。對於基於 Docker 的解決方案,我還需要:
由於我不再在 Airflow 中使用 PythonVirtualEnvOperator,我決定完全刪除它並改進工作流程。我必須更改 python 套件以具有開始下載和清除邏輯的入口點
我又花了 30-36 個小時才準備好了 Docker 映像的最終解決方案,這需要 6-7 個工作日,加上最初的 2 天,它變成了一項漫長的衝刺任務。
我回顧這一點並想知道,我不得不放棄工作解決方案,更改模組結構,創建 docker 映像,更改 10 多個 AirFlow 作業以使用 Docker 映像執行任務,處理這個現實並克服最初的挫敗感。所有這一切只是因為,「單一 python 模組需要「gcc」來編譯!」
以上是估計編碼任務:可能會出現什麼問題?的詳細內容。更多資訊請關注PHP中文網其他相關文章!