行程
Python 中的多線程其實(shí)不是真正的多線程,如果想要充分地使用多核心 CPU 的資源,在 Python 中大部分情況都需要使用多進(jìn)程。 Python 提供了非常好用的多進(jìn)程套件 multiprocessing,只需要定義一個(gè)函數(shù),Python 會(huì)完成其他所有事情。借助這個(gè)套件,可以輕鬆完成從單一進(jìn)程到並發(fā)執(zhí)行的轉(zhuǎn)換。 multiprocessing 支援子進(jìn)程、通訊和共享資料、執(zhí)行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等元件。
1、類別Process
建立流程的類別:Process([group [, target [, name [, args [, kwargs]]]]])
target 表示呼叫物件
args 表示呼叫物件的位置參數(shù)元組
#kwargs表示呼叫物件的字典
name為別名
group實(shí)質(zhì)上不使用
下面看一個(gè)創(chuàng)建函數(shù)並將其作為多個(gè)進(jìn)程的例子:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import multiprocessing import time def worker(interval, name): print(name + '【start】') time.sleep(interval) print(name + '【end】') if __name__ == "__main__": p1 = multiprocessing.Process(target=worker, args=(2, '兩點(diǎn)水1')) p2 = multiprocessing.Process(target=worker, args=(3, '兩點(diǎn)水2')) p3 = multiprocessing.Process(target=worker, args=(4, '兩點(diǎn)水3')) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print("END!!!!!!!!!!!!!!!!!")
輸出的結(jié)果:
多進(jìn)程輸出結(jié)果
2、把進(jìn)程創(chuàng)建成類
當(dāng)然我們也可以把進(jìn)程創(chuàng)建成一個(gè)類,如下面的例子,當(dāng)進(jìn)程p 呼叫start() 時(shí),自動(dòng)調(diào)用run() 方法。
# -*- coding: UTF-8 -*- import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("當(dāng)前時(shí)間: {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
輸出結(jié)果如下:
建立進(jìn)程類別
#3、daemon 屬性
想知道daemon 屬性有什麼用,看下面兩個(gè)例子吧,一個(gè)加了daemon 屬性,一個(gè)沒有加,對比輸出的結(jié)果:
沒有加deamon 屬性的例子:
# -*- coding: UTF-8 -*- import multiprocessing import time def worker(interval): print('工作開始時(shí)間:{0}'.format(time.ctime())) time.sleep(interval) print('工作結(jié)果時(shí)間:{0}'.format(time.ctime())) if __name__ == '__main__': p = multiprocessing.Process(target=worker, args=(3,)) p.start() print('【EMD】')
輸出結(jié)果:
【EMD】 工作開始時(shí)間:Mon Oct 9 17:47:06 2017 工作結(jié)果時(shí)間:Mon Oct 9 17:47:09 2017
在上面範(fàn)例中,進(jìn)程p 新增daemon 屬性:
# -*- coding: UTF-8 -*- import multiprocessing import time def worker(interval): print('工作開始時(shí)間:{0}'.format(time.ctime())) time.sleep(interval) print('工作結(jié)果時(shí)間:{0}'.format(time.ctime())) if __name__ == '__main__': p = multiprocessing.Process(target=worker, args=(3,)) p.daemon = True p.start() print('【EMD】')
輸出結(jié)果:
【EMD】
根據(jù)輸出結(jié)果可見,如果在子進(jìn)程中新增了daemon 屬性,那麼當(dāng)主進(jìn)程結(jié)束的時(shí)候,子進(jìn)程也會(huì)跟著結(jié)束。所以沒有列印子進(jìn)程的資訊。
4、join 方法
結(jié)合上面的範(fàn)例繼續(xù),如果我們想要讓子執(zhí)行緒執(zhí)行完該怎麼做呢?
那麼我們可以用到 join 方法,join 方法的主要作用是:阻塞目前進(jìn)程,直到呼叫 join 方法的那個(gè)進(jìn)程執(zhí)行完,然後再繼續(xù)執(zhí)行目前進(jìn)程。
因此看下加了join 方法的例子:
import multiprocessing import time def worker(interval): print('工作開始時(shí)間:{0}'.format(time.ctime())) time.sleep(interval) print('工作結(jié)果時(shí)間:{0}'.format(time.ctime())) if __name__ == '__main__': p = multiprocessing.Process(target=worker, args=(3,)) p.daemon = True p.start() p.join() print('【EMD】')
輸出的結(jié)果:
工作開始時(shí)間:Tue Oct 10 11:30:08 2017 工作結(jié)果時(shí)間:Tue Oct 10 11:30:11 2017 【EMD】
5、Pool
如果需要很多的子進(jìn)程,我們就需要一個(gè)一個(gè)的去創(chuàng)建嗎?
當(dāng)然不用,我們可以使用進(jìn)程池的方法批次建立子進(jìn)程。
範(fàn)例如下:
# -*- coding: UTF-8 -*- from multiprocessing import Pool import os, time, random def long_time_task(name): print('進(jìn)程的名稱:{0} ;進(jìn)程的PID: {1} '.format(name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('進(jìn)程 {0} 運(yùn)行了 {1} 秒'.format(name, (end - start))) if __name__ == '__main__': print('主進(jìn)程的 PID:{0}'.format(os.getpid())) p = Pool(4) for i in range(6): p.apply_async(long_time_task, args=(i,)) p.close() # 等待所有子進(jìn)程結(jié)束后在關(guān)閉主進(jìn)程 p.join() print('【End】')
輸出的結(jié)果如下:
主進(jìn)程的 PID:7256 進(jìn)程的名稱:0 ;進(jìn)程的PID: 1492 進(jìn)程的名稱:1 ;進(jìn)程的PID: 12232 進(jìn)程的名稱:2 ;進(jìn)程的PID: 4332 進(jìn)程的名稱:3 ;進(jìn)程的PID: 11604 進(jìn)程 2 運(yùn)行了 0.6500370502471924 秒 進(jìn)程的名稱:4 ;進(jìn)程的PID: 4332 進(jìn)程 1 運(yùn)行了 1.0830621719360352 秒 進(jìn)程的名稱:5 ;進(jìn)程的PID: 12232 進(jìn)程 5 運(yùn)行了 0.029001712799072266 秒 進(jìn)程 4 運(yùn)行了 0.9720554351806641 秒 進(jìn)程 0 運(yùn)行了 2.3181326389312744 秒 進(jìn)程 3 運(yùn)行了 2.5331451892852783 秒 【End】
這裡有一點(diǎn)要注意: Pool 物件呼叫join() 方法會(huì)等待所有子程序執(zhí)行完畢,呼叫join() 之前必須先呼叫close() ,呼叫close() 之後就不能繼續(xù)加入新的Process 了。
請注意輸出的結(jié)果,子進(jìn)程0,1,2,3是立刻執(zhí)行的,而子進(jìn)程4 要等待前面某個(gè)子進(jìn)程完成後才執(zhí)行,這是因?yàn)镻ool 的預(yù)設(shè)大小在我的電腦上是4,因此,最多同時(shí)執(zhí)行4 個(gè)行程。這是 Pool 有意設(shè)計(jì)的限制,並不是作業(yè)系統(tǒng)的限制。如果改成:
p = Pool(5)
就可以同時(shí)跑 5 個(gè)進(jìn)程。
6、進(jìn)程間通訊
Process 之間一定是需要通訊的,作業(yè)系統(tǒng)提供了許多機(jī)制來實(shí)現(xiàn)進(jìn)程間的通訊。 Python 的 multiprocessing 模組包裝了底層的機(jī)制,提供了Queue、Pipes 等多種方式來交換資料。
以 Queue 為例,在父進(jìn)程中建立兩個(gè)子進(jìn)程,一個(gè)往 Queue 寫數(shù)據(jù),一個(gè)從 Queue 讀取資料:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- from multiprocessing import Process, Queue import os, time, random def write(q): # 寫數(shù)據(jù)進(jìn)程 print('寫進(jìn)程的PID:{0}'.format(os.getpid())) for value in ['兩點(diǎn)水', '三點(diǎn)水', '四點(diǎn)水']: print('寫進(jìn) Queue 的值為:{0}'.format(value)) q.put(value) time.sleep(random.random()) def read(q): # 讀取數(shù)據(jù)進(jìn)程 print('讀進(jìn)程的PID:{0}'.format(os.getpid())) while True: value = q.get(True) print('從 Queue 讀取的值為:{0}'.format(value)) if __name__ == '__main__': # 父進(jìn)程創(chuàng)建 Queue,并傳給各個(gè)子進(jìn)程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動(dòng)子進(jìn)程 pw pw.start() # 啟動(dòng)子進(jìn)程pr pr.start() # 等待pw結(jié)束: pw.join() # pr 進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止 pr.terminate()
輸出的結(jié)果為:
讀進(jìn)程的PID:13208 寫進(jìn)程的PID:10864 寫進(jìn) Queue 的值為:兩點(diǎn)水 從 Queue 讀取的值為:兩點(diǎn)水 寫進(jìn) Queue 的值為:三點(diǎn)水 從 Queue 讀取的值為:三點(diǎn)水 寫進(jìn) Queue 的值為:四點(diǎn)水 從 Queue 讀取的值為:四點(diǎn)水