多執(zhí)行緒編程
其實(shí)建立執(zhí)行緒之後,執(zhí)行緒並不是總是保持一個(gè)狀態(tài)的,其狀態(tài)大概如下:
New 建立
Runnable 就緒。等待調(diào)度
Running 運(yùn)行
Blocked 阻塞。阻塞可能在 Wait Locked Sleeping
Dead 消亡
線程有著不同的狀態(tài),也有不同的類型。大致可分為:
主執(zhí)行緒
子執(zhí)行緒
守護(hù)執(zhí)行緒(後臺(tái)執(zhí)行緒)
前臺(tái)執(zhí)行緒
簡單了解完這些之後,我們開始看看具體的程式碼使用了。
1、執(zhí)行緒的建立
Python 提供兩個(gè)模組進(jìn)行多執(zhí)行緒的操作,分別是thread 和threading
前者是比較低階的模組,用於更底層的操作,一般應(yīng)用層級(jí)的開發(fā)不常用。
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import time import threading class MyThread(threading.Thread): def run(self): for i in range(5): print('thread {}, @number: {}'.format(self.name, i)) time.sleep(1) def main(): print("Start main threading") # 創(chuàng)建三個(gè)線程 threads = [MyThread() for i in range(3)] # 啟動(dòng)三個(gè)線程 for t in threads: t.start() print("End Main threading") if __name__ == '__main__': main()
運(yùn)行結(jié)果:
Start main threading thread Thread-1, @number: 0 thread Thread-2, @number: 0 thread Thread-3, @number: 0 End Main threading thread Thread-2, @number: 1 thread Thread-1, @number: 1 thread Thread-3, @number: 1 thread Thread-1, @number: 2 thread Thread-3, @number: 2 thread Thread-2, @number: 2 thread Thread-2, @number: 3 thread Thread-3, @number: 3 thread Thread-1, @number: 3 thread Thread-3, @number: 4 thread Thread-2, @number: 4 thread Thread-1, @number: 4
注意喔,這裡不同的環(huán)境輸出的結(jié)果肯定是不一樣的。
2、執(zhí)行緒合併(join方法)
上面的範(fàn)例列印出來的結(jié)果來看,主執(zhí)行緒結(jié)束後,子執(zhí)行緒還在運(yùn)行。那我們需要主執(zhí)行緒要等待子執(zhí)行緒運(yùn)行完後,再退出,該怎麼辦呢?
這時(shí)候,就需要用到 join 方法了。
在上面的例子,新增一段程式碼,具體如下:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import time import threading class MyThread(threading.Thread): def run(self): for i in range(5): print('thread {}, @number: {}'.format(self.name, i)) time.sleep(1) def main(): print("Start main threading") # 創(chuàng)建三個(gè)線程 threads = [MyThread() for i in range(3)] # 啟動(dòng)三個(gè)線程 for t in threads: t.start() # 一次讓新創(chuàng)建的線程執(zhí)行 join for t in threads: t.join() print("End Main threading") if __name__ == '__main__': main()
從列印的結(jié)果,可以清楚看到,相較於上面範(fàn)例列印出來的結(jié)果,主執(zhí)行緒是在等待子執(zhí)行緒運(yùn)行結(jié)束後才結(jié)束的。
Start main threading thread Thread-1, @number: 0 thread Thread-2, @number: 0 thread Thread-3, @number: 0 thread Thread-1, @number: 1 thread Thread-3, @number: 1 thread Thread-2, @number: 1 thread Thread-2, @number: 2 thread Thread-1, @number: 2 thread Thread-3, @number: 2 thread Thread-2, @number: 3 thread Thread-1, @number: 3 thread Thread-3, @number: 3 thread Thread-3, @number: 4 thread Thread-2, @number: 4 thread Thread-1, @number: 4 End Main threading
3、執(zhí)行緒同步與互斥鎖
使用執(zhí)行緒載入取得數(shù)據(jù),通常都會(huì)造成資料不同步的情況。當(dāng)然,這時(shí)候我們可以為資源加鎖,也就是存取資源的執(zhí)行緒需要獲得鎖才能存取。
其中 threading 模組給了我們一個(gè) Lock 功能。
lock = threading.Lock()
在線程中獲取鎖定
lock.acquire()
使用完成後,我們肯定需要釋放鎖定
lock.release()
當(dāng)然為了支援在同一線程中多次請求相同資源,Python 提供了可重入鎖(RLock)。 RLock 內(nèi)部維護(hù)一個(gè) Lock 和一個(gè) counter 變量,counter 記錄了 acquire 的次數(shù),從而使得資源可以被多次 require。直到一個(gè)線程所有的 acquire 都被 release,其他的線程才能獲得資源。
那麼怎麼建立重入鎖呢?也是一句程式碼的事情:
r_lock = threading.RLock()
4、Condition 條件變數(shù)
實(shí)用鎖定可以達(dá)到執(zhí)行緒同步,但是在更複雜的環(huán)境,需要針對鎖定進(jìn)行一些條件判斷。 Python 提供了 Condition 物件。使用 Condition 物件可以在某些事件觸發(fā)或達(dá)到特定的條件後才處理數(shù)據(jù),Condition 除了具有 Lock 物件的 acquire 方法和 release 方法外,還提供了 wait 和 notify 方法。執(zhí)行緒首先 acquire 一個(gè)條件變數(shù)鎖。如果條件不足,則該線程 wait,如果滿足就執(zhí)行線程,甚至可以 notify 其他線程。其他處於 wait 狀態(tài)的執(zhí)行緒接到通知後會(huì)重新判斷條件。
其中條件變數(shù)可以看成不同的線程先後 acquire 獲得鎖,如果不滿足條件,可以理解為被扔到一個(gè)( Lock 或 RLock )的 waiting 池。直達(dá)其他執(zhí)行緒 notify 之後再重新判斷條件。不斷的重複這個(gè)過程,從而解決複雜的同步問題。
此模式常用於生產(chǎn)者消費(fèi)者模式,請看以下線上購物買家和賣家的範(fàn)例:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import threading, time class Consumer(threading.Thread): def __init__(self, cond, name): # 初始化 super(Consumer, self).__init__() self.cond = cond self.name = name def run(self): # 確保先運(yùn)行Seeker中的方法 time.sleep(1) self.cond.acquire() print(self.name + ': 我這兩件商品一起買,可以便宜點(diǎn)嗎') self.cond.notify() self.cond.wait() print(self.name + ': 我已經(jīng)提交訂單了,你修改下價(jià)格') self.cond.notify() self.cond.wait() print(self.name + ': 收到,我支付成功了') self.cond.notify() self.cond.release() print(self.name + ': 等待收貨') class Producer(threading.Thread): def __init__(self, cond, name): super(Producer, self).__init__() self.cond = cond self.name = name def run(self): self.cond.acquire() # 釋放對瑣的占用,同時(shí)線程掛起在這里,直到被 notify 并重新占有瑣。 self.cond.wait() print(self.name + ': 可以的,你提交訂單吧') self.cond.notify() self.cond.wait() print(self.name + ': 好了,已經(jīng)修改了') self.cond.notify() self.cond.wait() print(self.name + ': 嗯,收款成功,馬上給你發(fā)貨') self.cond.release() print(self.name + ': 發(fā)貨商品') cond = threading.Condition() consumer = Consumer(cond, '買家(兩點(diǎn)水)') producer = Producer(cond, '賣家(三點(diǎn)水)') consumer.start() producer.start()
輸出的結(jié)果如下:
買家(兩點(diǎn)水): 我這兩件商品一起買,可以便宜點(diǎn)嗎 賣家(三點(diǎn)水): 可以的,你提交訂單吧 買家(兩點(diǎn)水): 我已經(jīng)提交訂單了,你修改下價(jià)格 賣家(三點(diǎn)水): 好了,已經(jīng)修改了 買家(兩點(diǎn)水): 收到,我支付成功了 買家(兩點(diǎn)水): 等待收貨 賣家(三點(diǎn)水): 嗯,收款成功,馬上給你發(fā)貨 賣家(三點(diǎn)水): 發(fā)貨商品
5、線程間通訊
如果程式中有多個(gè)線程,這些線程避免不了需要相互通訊的。那麼我們怎樣才能在這些線程之間安全地交換資訊或資料呢?
從一個(gè)執(zhí)行緒向另一個(gè)執(zhí)行緒發(fā)送資料最安全的方式可能就是使用 queue 庫中的佇列了。建立一個(gè)被多個(gè)執(zhí)行緒共享的 Queue 對象,這些執(zhí)行緒透過使用 put() 和 get() 操作來向佇列中新增或刪除元素。
# -*- coding: UTF-8 -*- from queue import Queue from threading import Thread isRead = True def write(q): # 寫數(shù)據(jù)進(jìn)程 for value in ['兩點(diǎn)水', '三點(diǎn)水', '四點(diǎn)水']: print('寫進(jìn) Queue 的值為:{0}'.format(value)) q.put(value) def read(q): # 讀取數(shù)據(jù)進(jìn)程 while isRead: value = q.get(True) print('從 Queue 讀取的值為:{0}'.format(value)) if __name__ == '__main__': q = Queue() t1 = Thread(target=write, args=(q,)) t2 = Thread(target=read, args=(q,)) t1.start() t2.start()
輸出的結(jié)果如下:
寫進(jìn) Queue 的值為:兩點(diǎn)水 寫進(jìn) Queue 的值為:三點(diǎn)水 從 Queue 讀取的值為:兩點(diǎn)水 寫進(jìn) Queue 的值為:四點(diǎn)水 從 Queue 讀取的值為:三點(diǎn)水 從 Queue 讀取的值為:四點(diǎn)水
Python 也提供了Event 物件用於執(zhí)行緒間通信,它是由執(zhí)行緒設(shè)定的訊號(hào)標(biāo)誌,如果訊號(hào)標(biāo)誌位真,則其他執(zhí)行緒等待直到訊號(hào)接觸。
Event 物件實(shí)作了簡單的執(zhí)行緒通訊機(jī)制,它提供了設(shè)定訊號(hào),清楚訊號(hào),等待等用於實(shí)現(xiàn)執(zhí)行緒間的通訊。
設(shè)定訊號(hào)
使用 Event 的 set() 方法可以設(shè)定 Event 物件內(nèi)部的訊號(hào)標(biāo)誌為真。 Event 物件提供了 isSe() 方法來判斷其內(nèi)部訊號(hào)標(biāo)誌的狀態(tài)。當(dāng)使用event 物件的set() 方法後,isSet() 方法傳回真
清除訊號(hào)
使用Event 物件的clear() 方法可以清除Event 物件內(nèi)部的訊號(hào)標(biāo)誌,即將其設(shè)為假,當(dāng)使用Event 的clear 方法後,isSet() 方法返回假
等待
Event 物件wait 的方法只有在內(nèi)部訊號(hào)為真的時(shí)候才會(huì)很快的執(zhí)行並完成返回。當(dāng) Event 物件的內(nèi)部訊號(hào)標(biāo)誌位元假時(shí),則 wait 方法會(huì)一直等待到其為真時(shí)才傳回。
範(fàn)例:
# -*- coding: UTF-8 -*- import threading class mThread(threading.Thread): def __init__(self, threadname): threading.Thread.__init__(self, name=threadname) def run(self): # 使用全局Event對象 global event # 判斷Event對象內(nèi)部信號(hào)標(biāo)志 if event.isSet(): event.clear() event.wait() print(self.getName()) else: print(self.getName()) # 設(shè)置Event對象內(nèi)部信號(hào)標(biāo)志 event.set() # 生成Event對象 event = threading.Event() # 設(shè)置Event對象內(nèi)部信號(hào)標(biāo)志 event.set() t1 = [] for i in range(10): t = mThread(str(i)) # 生成線程列表 t1.append(t) for i in t1: # 運(yùn)行線程 i.start()
輸出的結(jié)果如下:
1 0 3 2 5 4 7 6 9 8
#6、後臺(tái)執(zhí)行緒
##預(yù)設(shè)情況下,主執(zhí)行緒退出之後,即使子線程沒有join。那麼主執(zhí)行緒結(jié)束後,子執(zhí)行緒仍會(huì)繼續(xù)執(zhí)行。如果希望主執(zhí)行緒退出後,其子執(zhí)行緒也退出而不再執(zhí)行,則需要設(shè)定子執(zhí)行緒為後臺(tái)執(zhí)行緒。 Python 提供了 setDeamon 方法。