2023 年 2 月:我想查看電影電視節目的所有分數以及在一個頁面上流式傳輸它們的位置,但找不到包含與我相關的所有來源的聚合器。
2023 年 3 月:因此,我建立了一個 MVP,可以即時取得分數並將網站上線。它有效,但速度很慢(10 秒顯示分數)。
2023 年 10 月:意識到在我這邊儲存資料是必要的,我發現了 Windmill.dev。它很容易超越類似的編排引擎 - 至少滿足我的需求。
快轉到今天,經過 12 個月的連續數據咀嚼,我想詳細分享管道的工作原理。您將學習如何建立一個複雜的系統,從許多不同的來源獲取數據,規範化數據並將其組合成最佳化的查詢格式。
這是運行視圖。每個點代表一次流程運作。流程可以是任何內容,例如簡單的一步腳本:
中心的區塊包含這樣的腳本(簡化):
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
下面的野獸也是一個流(記住,這只是其中一個綠點):
(更高解析度的圖片:https://i.imgur.com/LGhTUGG.png)
讓我們來分解一下:
每個步驟都或多或少複雜,並且涉及使用非同步進程。
為了決定接下來要選擇哪些標題,有兩個並行處理的通道。這是風車的另一個亮點。並行化和編排與他們的架構完美配合。
選擇下一個項目的兩個通道是:
首先,將為每個資料來源選擇沒有附加任何資料的標題。這意味著如果 Metacritic 管道中有一部尚未被抓取的電影,那麼接下來就會選擇它。這可確保每個標題至少被處理一次,包括新標題。
一旦每個標題都附加了數據,管道就會選擇具有最新數據的標題。
這是此類流程運行的範例,由於達到了速率限制,因此出現錯誤:
Windmill 讓您輕鬆定義流程中每個步驟的重試次數。在這種情況下,邏輯是在出現錯誤時重試 3 次。除非達到速率限制(通常是不同的狀態代碼或錯誤訊息),否則我們會立即停止。
Lane 2:每部電影/節目單獨的優先流
最近的版本更新不夠及時。成功獲取每個數據方面可能需要幾週甚至幾個月的時間。例如,可能會出現一部電影有最近的 IMDb 評分,但其他評分已過時且串流媒體連結完全丟失的情況。特別是對於分數和串流媒體可用性,我希望獲得更好的準確性。
為了解決這個問題,第二個通道專注於不同的優先策略:選擇最受歡迎和最熱門的電影/節目來跨所有資料來源進行完整的資料刷新。 我之前展示過這個流程,這就是我之前所說的野獸。
應用程式上更頻繁顯示的標題也會獲得優先級提升。這意味著每次電影或節目出現在頂部搜尋結果中或打開其詳細資訊視圖時,它們可能很快就會刷新。每個標題只能
每週刷新一次使用優先通道以確保我們不會獲取同時可能未更改的資料。
你可以這樣做嗎?抓取注意事項一旦您從使用抓取資料的服務中獲利,您就可能違反了他們的條款和條件。 (請參閱網絡抓取的法律格局和“抓取”只是自動訪問,每個人都這樣做)
抓取和相關法律是新的,通常未經測試,並且存在許多法律灰色地帶。我決心相應地引用每個來源,尊重速率限制並避免不必要的請求,以盡量減少對他們服務的影響。事實是,這些數據不會被用來獲利。 GoodWatch 將永遠免費供所有人使用。
Windmill 使用工作執行緒在多個行程之間指派程式碼執行。 流程中的每個步驟都會傳送給工作人員,這使得它們獨立於實際的業務邏輯。 只有主應用程式編排作業,而工作人員只接收輸入資料、執行程式碼並傳回結果。
這是一個高效的架構,可以很好地擴展。目前,有12名工人正在分工。它們都託管在 Hetzner 上。
每個worker的最大資源消耗為1個vCPU和2GB RAM。概述如下:
Windmill 提供類似瀏覽器內IDE 的編輯器體驗,包括linting、自動格式化、AI 助理,甚至編輯(最後一項是付費功能)。不過最好的是這個按鈕:
它允許我在部署腳本之前快速迭代和測試腳本。我通常在瀏覽器中編輯和測試文件,完成後將它們推送到 git。
最佳編碼環境唯一缺少的是偵錯工具(斷點和變數上下文)。目前,我正在本地 IDE 中調試腳本來克服這個弱點。
我也是!
目前 GoodWatch 需要大約 100 GB 持久性資料儲存:
每天 6.500 個串流 透過 Windmill 的編排引擎運作。這導致每日交易量為:
由於不同的速率限制政策,這些數字根本不同。
每天一次,資料被清理並組合成最終的資料格式。目前為 GoodWatch 網路應用程式商店提供支援的資料庫:
想像一下您只能透過電影類型來區分電影,這是非常有限的嗎?
這就是我開始 DNA 計畫的原因。它允許以其他屬性對電影和節目進行分類,例如心情、情節元素、角色類型、對話框或或關鍵道具
.以下是所有項目中 DNA 值的前 10 位:
它允許兩件事:
範例:
未來將會有關於 DNA 的專門部落格文章提供更多詳細資訊。
為了充分了解資料管道的工作原理,以下是每個資料來源發生的情況的詳細說明:
對於每個資料來源,都有一個 ìnit 流程,用於準備包含所有必需資料的 MongoDB 集合。對於 IMDb,這只是 imdb_id。對於爛番茄,標題和發布年份是必要的。這是因為 ID 未知,我們需要根據名稱猜測正確的 URL。
根據上面解釋的優先順序選擇,準備好的集合中的項目將使用所獲得的資料進行更新。每個資料來源都有自己的集合,隨著時間的推移,這些集合會變得越來越完整。
有一個電影串流,一個電視節目串流,還有一個串流連結串流。它們從各種集合中收集所有必要的數據,並將它們儲存在各自的 Postgres 表中,然後由 Web 應用程式查詢。
以下是複製電影流程和劇本的摘錄:
其中一些流程需要很長時間才能執行,有時甚至超過 6 小時。這可以透過標記所有已更新的項目並僅複製這些項目而不是批量處理整個資料集來優化。我的清單上眾多 TODO 項目之一?
調度就像為需要自動執行的每個流程或腳本定義 cron 表達式一樣簡單:
以下是為 GoodWatch 定義的所有時間表的摘錄:
總共定義了約 50 個時間表。
偉大的數據伴隨著巨大的責任。很多事情都可能出錯。確實如此。
我的腳本的早期版本需要很長時間才能更新集合或表中的所有條目。那是因為我單獨更新了每個項目。這會導致大量開銷並顯著減慢進程。
更好的方法是收集要更新插入的資料並對資料庫查詢進行批次處理。這是 MongoDB 的範例:
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
即使使用批次處理,某些腳本也會消耗大量內存,導致工作線程崩潰。解決方案是針對每個用例仔細調整批量大小。
某些批次可以以 5000 為步長運行,其他批次可以在內存中存儲更多數據,並且以 500 為步長運行效果更好。
Windmill 有一個很棒的功能,可以在腳本運行時觀察記憶體:
Windmill 是任何開發人員用於自動化任務的工具包中的重要資產。它對我來說是一個無價的生產力提升器,讓我能夠專注於流程結構和業務邏輯,同時外包任務編排、錯誤處理、重試和快取的繁重工作。
處理大量數據仍然具有挑戰性,優化管道是一個持續的過程 - 但我對迄今為止一切的結果感到非常滿意。
也是這麼想的。只要讓我連結一些資源,我們就完成了:
您知道GoodWatch 是開源的嗎?您可以查看此儲存庫中的所有腳本和串流定義:https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f
如果您有任何疑問,請告訴我。
以上是數百萬部電影和數百萬串流媒體連結的數據管道的詳細內容。更多資訊請關注PHP中文網其他相關文章!