加快 `shutil.copytree` 速度!
關於加速shutil.copytree的討論
寫在這裡
這是關於 的討論,請參閱:https://discuss.python.org/t/speed-up-shutil-copytree/62078。如果您有任何想法,請發送給我!
背景
shutil 是 Python 中一個非常有用的模組。你可以在github中找到它:https://github.com/python/cpython/blob/master/Lib/shutil.py
shutil.copytree 是將一個資料夾複製到另一個資料夾的函式。
在該函數中,呼叫_copytree函數進行複製。
_copytree 有什麼作用?
- 忽略指定的檔案/目錄。
- 建立目標目錄。
- 在處理符號連結時複製檔案或目錄。
- 收集並最終提出遇到的錯誤(例如權限問題)。
- 將來源目錄的元資料複製到目標目錄。
問題
_當檔案數量較多或檔案大小較大時,copytree速度不是很快。
在這裡測試:
import os import shutil os.mkdir('test') os.mkdir('test/source') def bench_mark(func, *args): import time start = time.time() func(*args) end = time.time() print(f'{func.__name__} takes {end - start} seconds') return end - start # write in 3000 files def write_in_5000_files(): for i in range(5000): with open(f'test/source/{i}.txt', 'w') as f: f.write('Hello World' + os.urandom(24).hex()) f.close() bench_mark(write_in_5000_files) def copy(): shutil.copytree('test/source', 'test/destination') bench_mark(copy)
結果是:
write_in_5000_files 需要 4.084963083267212 秒
複製需要 27.12768316268921 秒
我做了什麼
多執行緒
我使用多執行緒來加速複製過程。我將函式重新命名為_copytree_single_threaded,新增一個新函式_copytree_multithreaded。這是copytree_multithreaded:
def _copytree_multithreaded(src, dst, symlinks=False, ignore=None, copy_function=shutil.copy2, ignore_dangling_symlinks=False, dirs_exist_ok=False, max_workers=4): """Recursively copy a directory tree using multiple threads.""" sys.audit("shutil.copytree", src, dst) # get the entries to copy entries = list(os.scandir(src)) # make the pool with ThreadPoolExecutor(max_workers=max_workers) as executor: # submit the tasks futures = [ executor.submit(_copytree_single_threaded, entries=[entry], src=src, dst=dst, symlinks=symlinks, ignore=ignore, copy_function=copy_function, ignore_dangling_symlinks=ignore_dangling_symlinks, dirs_exist_ok=dirs_exist_ok) for entry in entries ] # wait for the tasks for future in as_completed(futures): try: future.result() except Exception as e: print(f"Failed to copy: {e}") raise
我新增了一個判斷,選擇是否使用多執行緒。
if len(entries) >= 100 or sum(os.path.getsize(entry.path) for entry in entries) >= 100*1024*1024: # multithreaded version return _copytree_multithreaded(src, dst, symlinks=symlinks, ignore=ignore, copy_function=copy_function, ignore_dangling_symlinks=ignore_dangling_symlinks, dirs_exist_ok=dirs_exist_ok) else: # single threaded version return _copytree_single_threaded(entries=entries, src=src, dst=dst, symlinks=symlinks, ignore=ignore, copy_function=copy_function, ignore_dangling_symlinks=ignore_dangling_symlinks, dirs_exist_ok=dirs_exist_ok)
測試
我在來源資料夾中寫入了 50000 個檔案。基準標記:
def bench_mark(func, *args): import time start = time.perf_counter() func(*args) end = time.perf_counter() print(f"{func.__name__} costs {end - start}s")
寫入:
import os os.mkdir("Test") os.mkdir("Test/source") # write in 50000 files def write_in_file(): for i in range(50000): with open(f"Test/source/{i}.txt", 'w') as f: f.write(f"{i}") f.close()
兩個比較:
def copy1(): import shutil shutil.copytree('test/source', 'test/destination1') def copy2(): import my_shutil my_shutil.copytree('test/source', 'test/destination2')
- 「my_shutil」是我修改過的shutil版本。
副本 1 花費 173.04780609999943s
copy2 花費 155.81321870000102s
copy2 比 copy1 快很多。你可以跑很多次。
優點和缺點
使用多執行緒可以加快複製過程。但會增加記憶體佔用。但我們不需要在程式碼中重寫多線程。
非同步
感謝「巴瑞·史考特」。我會聽從他/她的建議:
透過使用非同步 I/O,您可能會以更少的開銷獲得相同的改進。
我寫了這些程式碼:
import os import shutil import asyncio from concurrent.futures import ThreadPoolExecutor import time # create directory def create_target_directory(dst): os.makedirs(dst, exist_ok=True) # copy 1 file async def copy_file_async(src, dst): loop = asyncio.get_event_loop() await loop.run_in_executor(None, shutil.copy2, src, dst) # copy directory async def copy_directory_async(src, dst, symlinks=False, ignore=None, dirs_exist_ok=False): entries = os.scandir(src) create_target_directory(dst) tasks = [] for entry in entries: src_path = entry.path dst_path = os.path.join(dst, entry.name) if entry.is_dir(follow_symlinks=not symlinks): tasks.append(copy_directory_async(src_path, dst_path, symlinks, ignore, dirs_exist_ok)) else: tasks.append(copy_file_async(src_path, dst_path)) await asyncio.gather(*tasks) # choose copy method def choose_copy_method(entries, src, dst, **kwargs): if len(entries) >= 100 or sum(os.path.getsize(entry.path) for entry in entries) >= 100 * 1024 * 1024: # async version asyncio.run(copy_directory_async(src, dst, **kwargs)) else: # single thread version shutil.copytree(src, dst, **kwargs) # test function def bench_mark(func, *args): start = time.perf_counter() func(*args) end = time.perf_counter() print(f"{func.__name__} costs {end - start:.2f}s") # write in 50000 files def write_in_50000_files(): for i in range(50000): with open(f"Test/source/{i}.txt", 'w') as f: f.write(f"{i}") def main(): os.makedirs('Test/source', exist_ok=True) write_in_50000_files() # 单线程复制 def copy1(): shutil.copytree('Test/source', 'Test/destination1') def copy2(): shutil.copytree('Test/source', 'Test/destination2') # async def copy3(): entries = list(os.scandir('Test/source')) choose_copy_method(entries, 'Test/source', 'Test/destination3') bench_mark(copy1) bench_mark(copy2) bench_mark(copy3) shutil.rmtree('Test') if __name__ == "__main__": main()
輸出:
副本 1 花費 187.21 秒
copy2 花費 244.33s
copy3 花費 111.27 秒
可以看到非同步版本比單執行緒版本更快。但單線程版本比多線程版本更快。 (可能是我的測試環境不太好,你可以嘗試一下,把你的結果回覆給我)
謝謝巴瑞·史考特!
優點和缺點
非同步是一個不錯的選擇。但沒有一個解決方案是完美的。如果您發現任何問題,可以回覆我。
結尾
這是我第一次在 python.org 寫討論。如果有任何問題,請告訴我。謝謝。
我的Github:https://github.com/mengqinyuan
我的開發者:https://dev.to/mengqinyuan
以上是加快 `shutil.copytree` 速度!的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undress AI Tool
免費脫衣圖片

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

本文為您精選了多個頂級的Python“成品”項目網站與高水平“大片”級學習資源入口。無論您是想尋找開發靈感、觀摩學習大師級的源代碼,還是系統性地提昇實戰能力,這些平台都是不容錯過的寶庫,能幫助您快速成長為Python高手。

使用subprocess.run()可安全執行shell命令並捕獲輸出,推薦以列表傳參避免注入風險;2.需要shell特性時可設shell=True,但需警惕命令注入;3.使用subprocess.Popen可實現實時輸出處理;4.設置check=True可在命令失敗時拋出異常;5.簡單場景可直接鍊式調用獲取輸出;日常應優先使用subprocess.run(),避免使用os.system()或已棄用模塊,以上方法覆蓋了Python中執行shell命令的核心用法。

使用Seaborn的jointplot可快速可視化兩個變量間的關係及各自分佈;2.基礎散點圖通過sns.jointplot(data=tips,x="total_bill",y="tip",kind="scatter")實現,中心為散點圖,上下和右側顯示直方圖;3.添加回歸線和密度信息可用kind="reg",並結合marginal_kws設置邊緣圖樣式;4.數據量大時推薦kind="hex",用

使用httpx.AsyncClient可高效发起异步HTTP请求,1.基本GET请求通过asyncwith管理客户端并用awaitclient.get发起非阻塞请求;2.并发多个请求时结合asyncio.gather可显著提升性能,总耗时等于最慢请求;3.支持自定义headers、认证、base_url和超时设置;4.可发送POST请求并携带JSON数据;5.注意避免混用同步异步代码,代理支持需注意后端兼容性,适合用于爬虫或API聚合等场景。

字符串列表可用join()方法合併,如''.join(words)得到"HelloworldfromPython";2.數字列表需先用map(str,numbers)或[str(x)forxinnumbers]轉為字符串後才能join;3.任意類型列表可直接用str()轉換為帶括號和引號的字符串,適用於調試;4.自定義格式可用生成器表達式結合join()實現,如'|'.join(f"[{item}]"foriteminitems)輸出"[a]|[

pythoncanbeoptimizedFormized-formemory-boundoperationsbyreducingOverHeadThroughGenerator,有效dattratsures,andManagingObjectLifetimes.first,useGeneratorSInsteadoFlistSteadoflistSteadoFocessLargedAtasetSoneItematatime,desceedingingLoadeGingloadInterveringerverneDraineNterveingerverneDraineNterveInterveIntMory.second.second.second.second,Choos,Choos

安裝pyodbc:使用pipinstallpyodbc命令安裝庫;2.連接SQLServer:通過pyodbc.connect()方法,使用包含DRIVER、SERVER、DATABASE、UID/PWD或Trusted_Connection的連接字符串,分別支持SQL身份驗證或Windows身份驗證;3.查看已安裝驅動:運行pyodbc.drivers()並篩選含'SQLServer'的驅動名,確保使用如'ODBCDriver17forSQLServer'等正確驅動名稱;4.連接字符串關鍵參數

shutil.rmtree()是Python中用於遞歸刪除整個目錄樹的函數,能刪除指定文件夾及其所有內容。 1.基本用法:使用shutil.rmtree(path)刪除目錄,需處理FileNotFoundError、PermissionError等異常。 2.實際應用:可一鍵清除包含子目錄和文件的文件夾,如臨時數據或緩存目錄。 3.注意事項:刪除操作不可恢復;路徑不存在時拋出FileNotFoundError;可能因權限或文件佔用導致失敗。 4.可選參數:可通過ignore_errors=True忽略錯
