python中的迭代器是單次消費的,一旦被完全遍歷(例如通過`list()`轉換),它就會耗盡并變?yōu)榭?。在多進程環(huán)境中,如果一個迭代器在傳遞給`multiprocessing.pool.starmap`之前被意外耗盡,`starmap`將接收到一個空的迭代器,導致沒有任何任務被提交和執(zhí)行。這會掩蓋潛在的運行時錯誤,因為工作函數(shù)根本沒有被調用,從而無法拋出預期的異常。
在Python中,迭代器是一種對象,它允許我們一次訪問一個元素。許多內置類型,如列表、元組、字符串和字典,都是可迭代的,但它們本身并不是迭代器。當我們使用for循環(huán)、list()、tuple()、sum()等函數(shù)或表達式時,Python會在內部從可迭代對象中獲取一個迭代器。
迭代器的核心特性是它實現(xiàn)了__iter__()和__next__()方法。__next__()方法在每次調用時返回序列中的下一個項目,并在沒有更多項目時引發(fā)StopIteration異常。
一個關鍵點是:迭代器是單次消費的。這意味著一旦一個迭代器被完全遍歷,它就變得“耗盡”了,無法再次提供數(shù)據(jù)。例如,zip函數(shù)返回一個迭代器,它也遵循這個原則。
# 示例:zip對象作為迭代器 x = (0, 1, 2) y = "ABC" zipper = zip(x, y) print("第一次遍歷:") for n, s in zipper: print(n, s) print("第二次遍歷:") # 此時zipper已經(jīng)耗盡,不會打印任何內容 for n, s in zipper: print(n, s)
運行上述代碼,你會發(fā)現(xiàn)“第二次遍歷”部分不會有任何輸出,因為zipper迭代器在第一次for循環(huán)中已經(jīng)被完全消費。
立即學習“Python免費學習筆記(深入)”;
當對一個迭代器執(zhí)行諸如list(iterator)、tuple(iterator)、set(iterator)或在for循環(huán)中完整遍歷它時,迭代器中的所有元素都會被取出并用于構建新的數(shù)據(jù)結構或執(zhí)行相應操作。完成這些操作后,迭代器內部的狀態(tài)指針會指向序列的末尾,使其無法再提供任何數(shù)據(jù)。
考慮以下示例:
x = (0, 1, 2) y = "ABC" zipper = zip(x, y) # 顯式地將迭代器轉換為列表 my_list = list(zipper) print(f"轉換為列表后:{my_list}") # 此時zipper迭代器已經(jīng)耗盡 print("嘗試再次遍歷耗盡的迭代器:") for n, s in zipper: print(n, s) # 這行代碼不會被執(zhí)行
在這個例子中,list(zipper)操作徹底耗盡了zipper迭代器。因此,隨后的for循環(huán)發(fā)現(xiàn)zipper已經(jīng)為空,便直接跳過循環(huán)體,不會引發(fā)任何錯誤,但也不會執(zhí)行任何操作。
在多進程編程中,特別是使用multiprocessing.Pool.starmap時,迭代器耗盡的特性可能會導致難以察覺的問題。starmap函數(shù)接受一個可迭代對象作為其任務參數(shù)的來源。它會從這個可迭代對象中逐一取出元素,并將它們作為參數(shù)傳遞給目標函數(shù)在不同的進程中執(zhí)行。
如果傳遞給starmap的可迭代對象在被starmap使用之前就已經(jīng)耗盡,那么starmap將接收到一個空的序列。這意味著:
回到原始問題中的代碼片段:
from itertools import repeat import multiprocessing def starmap_with_kwargs(pool, fn, args_iter, kwargs_iter): args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter) # print(args_iter) # 這里的args_iter是zip對象,尚未耗盡 return pool.starmap(apply_args_and_kwargs, args_for_starmap) def apply_args_and_kwargs(fn, args, kwargs): # print('test') return fn(*args, **kwargs) def func(path, dictArg, **kwargs): # 這里的dictArg預期是字典,但如果數(shù)據(jù)源有誤,可能是字符串 for i in dictArg: # 如果dictArg是字符串,此循環(huán)會迭代字符串的字符 print(i['a']) # 如果i是字符,嘗試['a']索引會引發(fā)TypeError print(kwargs['yes']) def funcWrapper(path, dictList, **kwargs): args_iter = zip(repeat(path), dictList) kwargs_iter = repeat(kwargs) # 關鍵行:如果取消注釋,args_iter會在此處耗盡 # list(args_iter) pool = multiprocessing.Pool() starmap_with_kwargs(pool, func, args_iter, kwargs_iter) pool.close() pool.join() # 確保所有進程完成 dictList = [{'a: 2'}, {'a': 65}, {'a': 213}, {'a': 3218}] # 注意第一個元素是字符串! path = 'some/path/to/something' funcWrapper(path, dictList, yes=1)
在dictList中,第一個元素{'a: 2'}是一個字符串,而不是一個字典。當func函數(shù)嘗試對這個字符串進行i['a']操作時,就會引發(fā)TypeError: string indices must be integers。
情況一:list(args_iter)被注釋掉args_iter (一個zip迭代器) 被創(chuàng)建后,直接傳遞給了starmap_with_kwargs,最終進入pool.starmap。starmap會從args_for_starmap(也是一個zip迭代器,內部包含args_iter的引用)中取出任務,并調度apply_args_and_kwargs在子進程中執(zhí)行。當func接收到dictArg為字符串{'a: 2'}時,會嘗試i['a']操作,從而引發(fā)TypeError。
情況二:list(args_iter)被取消注釋 在funcWrapper中,當執(zhí)行l(wèi)ist(args_iter)時,args_iter這個zip迭代器會被立即完全遍歷,并將其所有元素收集到一個臨時列表中。完成此操作后,args_iter迭代器自身就耗盡了。 隨后,當這個已經(jīng)耗盡的args_iter被傳遞給starmap_with_kwargs時,args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter)也會創(chuàng)建一個基于一個空迭代器的新zip迭代器。 最終,pool.starmap接收到一個空的args_for_starmap迭代器。這意味著starmap發(fā)現(xiàn)沒有任務可供執(zhí)行,所以它不會調用apply_args_and_kwargs,進而func也永遠不會被調用。由于func從未被調用,其中導致TypeError的邏輯也就無從觸發(fā),因此看不到任何錯誤信息。
為了避免這種迭代器耗盡導致的問題,并確保多進程任務能夠按預期執(zhí)行,請遵循以下原則:
一次性轉換為具體數(shù)據(jù)結構: 如果你需要在程序的多個地方使用同一個迭代器的數(shù)據(jù),或者需要對其進行預處理或調試,最好的方法是將其一次性轉換為一個列表或元組。
# 修正后的funcWrapper示例 def funcWrapper_fixed(path, dictList, **kwargs): args_iter_raw = zip(repeat(path), dictList) # 將迭代器轉換為列表,這樣可以多次使用或檢查 args_list = list(args_iter_raw) # 此時args_list可以用于調試或多次傳遞 # print(args_list) kwargs_iter = repeat(kwargs) # kwargs_iter可以保持為迭代器,因為它只在zip中被消費一次 pool = multiprocessing.Pool() # 注意:這里需要重新構造args_for_starmap,因為它依賴于args_iter # 如果args_list是固定列表,則可以直接使用 # 但如果starmap_with_kwargs需要迭代器,那么args_list在這里可以作為新的迭代源 # 實際傳遞給starmap_with_kwargs的應該是zip(repeat(fn), args_list, kwargs_iter) # 更簡潔的傳遞方式,確保args_list被正確處理 starmap_with_kwargs(pool, func, args_list, kwargs_iter) pool.close() pool.join()
在starmap_with_kwargs內部,如果args_iter期望的是一個可迭代對象,那么args_list作為列表是完全兼容的。
每次使用時重新創(chuàng)建迭代器: 如果數(shù)據(jù)源允許,并且你確實需要在不同上下文中使用獨立的迭代序列,可以在每次需要時重新創(chuàng)建迭代器。
# 重新創(chuàng)建迭代器的示例 def funcWrapper_recreate(path, dictList, **kwargs): # 第一次使用 args_iter_1 = zip(repeat(path), dictList) # do something with args_iter_1, e.g., list(args_iter_1) # 第二次使用時,重新創(chuàng)建 args_iter_2 = zip(repeat(path), dictList) kwargs_iter = repeat(kwargs) pool = multiprocessing.Pool() starmap_with_kwargs(pool, func, args_iter_2, kwargs_iter) pool.close() pool.join()
注意調試時的副作用: 在調試代碼時,避免在不經(jīng)意間通過list()或for循環(huán)耗盡你打算傳遞給后續(xù)函數(shù)的迭代器。如果需要查看迭代器的內容,可以先將其轉換為列表,然后將該列表傳遞給后續(xù)函數(shù),而不是原始的迭代器。
Python迭代器的單次消費特性是其設計的一部分,旨在提高內存效率。然而,在多進程或其他需要多次訪問相同數(shù)據(jù)序列的場景中,如果不理解這一特性,就可能導致邏輯錯誤被掩蓋。核心原則是:一旦迭代器被完全遍歷,它就耗盡了。 在將迭代器傳遞給multiprocessing.Pool.starmap等函數(shù)之前,務必確保它尚未被其他操作耗盡。通過將迭代器轉換為列表或在每次需要時重新創(chuàng)建迭代器,可以有效規(guī)避這類問題,并確保代碼的健壯性。
以上就是Python迭代器耗盡機制在多進程中的影響與規(guī)避的詳細內容,更多請關注php中文網(wǎng)其它相關文章!
每個人都需要一臺速度更快、更穩(wěn)定的 PC。隨著時間的推移,垃圾文件、舊注冊表數(shù)據(jù)和不必要的后臺進程會占用資源并降低性能。幸運的是,許多工具可以讓 Windows 保持平穩(wěn)運行。
Copyright 2014-2025 http://ipnx.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號