本文深入探討了go語(yǔ)言并發(fā)編程中常見(jiàn)的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題,并提供了一套健壯的解決方案。通過(guò)一個(gè)字符計(jì)數(shù)示例,我們分析了共享狀態(tài)、指針傳遞以及同步機(jī)制可能引發(fā)的錯(cuò)誤,并展示了如何利用局部變量、數(shù)據(jù)復(fù)制和`sync.waitgroup`等go語(yǔ)言特性,構(gòu)建出高效且結(jié)果一致的并發(fā)程序,同時(shí)強(qiáng)調(diào)了使用go競(jìng)態(tài)檢測(cè)工具的重要性。
在Go語(yǔ)言中,通過(guò)goroutine和channel實(shí)現(xiàn)并發(fā)是其核心優(yōu)勢(shì)之一。然而,不當(dāng)?shù)牟l(fā)設(shè)計(jì),尤其是在處理共享數(shù)據(jù)時(shí),極易引入數(shù)據(jù)競(jìng)爭(zhēng)(data race),導(dǎo)致程序行為不確定,輸出結(jié)果不一致。本教程將通過(guò)一個(gè)具體的字符計(jì)數(shù)示例,詳細(xì)分析數(shù)據(jù)競(jìng)爭(zhēng)產(chǎn)生的原因,并提供一套專(zhuān)業(yè)的解決方案。
設(shè)想一個(gè)場(chǎng)景:我們需要統(tǒng)計(jì)大量DNA序列中特定字符(如'A', 'T', 'G', 'C')的出現(xiàn)次數(shù)。為了加速處理,我們采用Go語(yǔ)言的并發(fā)機(jī)制,創(chuàng)建多個(gè)Worker goroutine并行處理輸入的DNA字符串,并通過(guò)channel匯總結(jié)果。然而,在多核CPU環(huán)境下運(yùn)行程序時(shí),我們發(fā)現(xiàn)最終的字符計(jì)數(shù)結(jié)果并不穩(wěn)定,每次運(yùn)行都可能得到不同的值,而在單核環(huán)境下卻能保持一致。這通常是數(shù)據(jù)競(jìng)爭(zhēng)的典型表現(xiàn)。
原始設(shè)計(jì)中,程序的關(guān)鍵組件包括:
經(jīng)過(guò)分析,原始設(shè)計(jì)中存在以下幾個(gè)主要的數(shù)據(jù)競(jìng)爭(zhēng)點(diǎn):
立即進(jìn)入“豆包AI人工智官網(wǎng)入口”;
立即學(xué)習(xí)“豆包AI人工智能在線問(wèn)答入口”;
共享的全局/包級(jí)變量 at 和 gc: 在Worker函數(shù)內(nèi)部,用于累加'A'/'T'和'G'/'C'計(jì)數(shù)的變量at和gc被聲明為全局或包級(jí)變量。這意味著所有Worker goroutine都在并發(fā)地讀寫(xiě)這兩個(gè)共享變量,而沒(méi)有采取任何同步措施。當(dāng)多個(gè)goroutine同時(shí)嘗試修改這些變量時(shí),就會(huì)發(fā)生數(shù)據(jù)競(jìng)爭(zhēng),導(dǎo)致計(jì)數(shù)結(jié)果錯(cuò)誤或丟失。
// 原始代碼片段(簡(jiǎn)化) var at int // 全局或包級(jí)變量 var gc int // 全局或包級(jí)變量 func Worker(...) { // ... for { // ... at++ // 多個(gè)goroutine并發(fā)修改 gc++ // 多個(gè)goroutine并發(fā)修改 // ... } }
通過(guò)Channel傳遞指針 *int 和 *[]byte: 盡管channel本身是并發(fā)安全的,但它傳遞的是數(shù)據(jù)的副本。當(dāng)傳遞指針時(shí),channel傳遞的是指針的副本,而不是指針?biāo)赶虻臄?shù)據(jù)的副本。這意味著多個(gè)goroutine可能持有同一個(gè)指針,并并發(fā)地訪問(wèn)或修改其指向的底層數(shù)據(jù)。
不健壯的同步機(jī)制: 原始代碼使用了一個(gè)基于CpuCnt倒計(jì)數(shù)的select循環(huán)和goto語(yǔ)句來(lái)判斷所有Worker是否完成。這種手動(dòng)管理goroutine生命周期的方式容易出錯(cuò),且不如Go標(biāo)準(zhǔn)庫(kù)提供的sync.WaitGroup直觀和安全。例如,如果SpawnWork在所有Worker處理完所有數(shù)據(jù)之前關(guān)閉了inStr,或者Worker在發(fā)送完所有結(jié)果之前就退出了,都可能導(dǎo)致數(shù)據(jù)丟失或程序提前終止。
為了解決上述數(shù)據(jù)競(jìng)爭(zhēng)和同步問(wèn)題,我們采取了以下改進(jìn)措施:
消除共享狀態(tài),使計(jì)數(shù)器局部化: 將at和gc變量聲明在Worker函數(shù)的循環(huán)內(nèi)部,使其成為每個(gè)Worker處理每個(gè)字符串時(shí)的局部變量。這樣,每個(gè)Worker都有自己獨(dú)立的計(jì)數(shù)器,在處理完一個(gè)字符串后,將值發(fā)送到結(jié)果channel。
func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) { defer wg.Done() // 確保goroutine完成時(shí)通知WaitGroup for ch := range inCh { // 遍歷channel,直到它被關(guān)閉 at := 0 // 局部變量,每個(gè)字符串處理一次 gc := 0 // 局部變量 for i := 0; i < len(ch); i++ { if ch[i] == 'A' || ch[i] == 'T' { at++ } else if ch[i] == 'G' || ch[i] == 'C' { gc++ } } resA <- at // 發(fā)送值,而不是指針 resB <- gc // 發(fā)送值 } }
通過(guò)Channel傳遞值類(lèi)型或數(shù)據(jù)副本:
func SpawnWork(inStr chan<- []byte) { // ... for scanner.Scan() { s := scanner.Bytes() // ... s_copy := append([]byte(nil), s...) // 深拷貝切片 inStr <- s_copy // 發(fā)送切片副本 } close(inStr) // 完成后關(guān)閉輸入channel }
使用 sync.WaitGroup 進(jìn)行健壯的同步:sync.WaitGroup是Go語(yǔ)言中用于等待一組goroutine完成的標(biāo)準(zhǔn)且推薦的機(jī)制。
在啟動(dòng)每個(gè)Worker goroutine之前,調(diào)用wg.Add(1)來(lái)增加計(jì)數(shù)器。
在每個(gè)Worker goroutine的defer語(yǔ)句中調(diào)用wg.Done(),確保無(wú)論goroutine如何退出(正常完成或發(fā)生panic),計(jì)數(shù)器都會(huì)被遞減。
在main goroutine中,創(chuàng)建一個(gè)新的goroutine來(lái)執(zhí)行SpawnWork,并在其內(nèi)部調(diào)用wg.Wait()。這確保了SpawnWork在所有Worker完成其工作后才關(guān)閉結(jié)果channel。
main goroutine通過(guò)for range resChA循環(huán)接收結(jié)果,當(dāng)resChA被關(guān)閉時(shí),循環(huán)會(huì)自動(dòng)結(jié)束。
func main() { // ... var wg sync.WaitGroup for i := 0; i < CpuCnt; i++ { wg.Add(1) // 增加WaitGroup計(jì)數(shù)器 go Worker(inStr, resChA, resChB, &wg) } go func() { SpawnWork(inStr) // 啟動(dòng)工作生成器 wg.Wait() // 等待所有Worker完成 close(resChA) // 關(guān)閉結(jié)果channel close(resChB) // 關(guān)閉結(jié)果channel }() A := 0 B := 0 // 使用for range安全地接收結(jié)果,直到channel關(guān)閉 for tmp_at := range resChA { tmp_gc := <-resChB A += tmp_at B += tmp_gc // ... } // ... }
package main import ( "bufio" "fmt" "runtime" "strings" "sync" ) // Worker goroutine負(fù)責(zé)處理字符串并計(jì)數(shù) func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) { defer wg.Done() // 確保Worker完成時(shí)通知WaitGroup // fmt.Println("Worker started...") // 可用于調(diào)試 for ch := range inCh { // 從輸入channel接收字符串,直到channel關(guān)閉 at := 0 // 局部變量,用于統(tǒng)計(jì)當(dāng)前字符串的A/T計(jì)數(shù) gc := 0 // 局部變量,用于統(tǒng)計(jì)當(dāng)前字符串的G/C計(jì)數(shù) for i := 0; i < len(ch); i++ { if ch[i] == 'A' || ch[i] == 'T' { at++ } else if ch[i] == 'G' || ch[i] == 'C' { gc++ } } resA <- at // 將局部計(jì)數(shù)結(jié)果發(fā)送到結(jié)果channel resB <- gc } } // SpawnWork goroutine負(fù)責(zé)生成工作(DNA字符串) func SpawnWork(inStr chan<- []byte) { // fmt.Println("Spawning work:") // 可用于調(diào)試 // 人工輸入數(shù)據(jù),為了演示目的進(jìn)行擴(kuò)展 StringData := "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" + "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" + "CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA\n" + "TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC\n" + "AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT\n" + "TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG\n" + "TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" + "NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA\n" // 擴(kuò)展數(shù)據(jù)1000次,以增加處理量 tmp := StringData for n := 0; n < 1000; n++ { StringData = StringData + tmp } scanner := bufio.NewScanner(strings.NewReader(StringData)) scanner.Split(bufio.ScanLines) for scanner.Scan() { s := scanner.Bytes() if len(s) == 0 || s[0] == '>' { continue } else { // 對(duì)切片進(jìn)行深拷貝,確保每個(gè)Worker處理的是獨(dú)立的數(shù)據(jù)副本 s_copy := append([]byte(nil), s...) inStr <- s_copy } } close(inStr) // 所有數(shù)據(jù)發(fā)送完畢后,關(guān)閉輸入channel } func main() { CpuCnt := runtime.NumCPU() // 獲取CPU核心數(shù) runtime.GOMAXPROCS(CpuCnt) // 設(shè)置Go調(diào)度器使用與CPU核心數(shù)相同的邏輯處理器 fmt.Printf("Processors: %d\n", CpuCnt) resChA := make(chan int) // 用于接收A/T計(jì)數(shù)的channel resChB := make(chan int) // 用于接收G/C計(jì)數(shù)的channel inStr := make(chan []byte) // 用于發(fā)送DNA字符串的channel fmt.Println("Spawning workers:") var wg sync.WaitGroup // 初始化WaitGroup for i := 0; i < CpuCnt; i++ { wg.Add(1) // 每啟動(dòng)一個(gè)Worker,WaitGroup計(jì)數(shù)器加1 go Worker(inStr, resChA, resChB, &wg) } fmt.Println("Spawning work:") // 啟動(dòng)一個(gè)goroutine來(lái)生成工作并等待所有Worker完成 go func() { SpawnWork(inStr) // 啟動(dòng)工作生成器 wg.Wait() // 等待所有Worker goroutine完成 close(resChA) // 所有Worker完成后,關(guān)閉結(jié)果channelA close(resChB) // 所有Worker完成后,關(guān)閉結(jié)果channelB }() A := 0 // 總A/T計(jì)數(shù) B := 0 // 總G/C計(jì)數(shù) LineCnt := 0 // 處理的行數(shù) // 使用for range循環(huán)接收結(jié)果,當(dāng)resChA關(guān)閉時(shí),循環(huán)會(huì)自動(dòng)退出 for tmp_at := range resChA { tmp_gc := <-resChB // resChA和resChB的結(jié)果是成對(duì)出現(xiàn)的 A += tmp_at B += tmp_gc LineCnt++ } if !(A+B > 0) { fmt.Println("No A/B was found!") } else { ABFraction := float32(B) / float32(A+B) fmt.Println("\n----------------------------") fmt.Printf("Cpu's : %d\n", CpuCnt) fmt.Printf("Lines : %d\n", LineCnt) fmt.Printf("A+B : %d\n", A+B) fmt.Printf("A : %d\n", A) fmt.Printf("B : %d\n", B) // 修正:此處應(yīng)打印B的值,而不是A fmt.Printf("AB frac: %.2f%%\n", ABFraction*100) fmt.Println("----------------------------") } }
通過(guò)遵循這些最佳實(shí)踐,開(kāi)發(fā)者可以有效地避免Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng),構(gòu)建出穩(wěn)定、高效且可預(yù)測(cè)的并發(fā)應(yīng)用程序。
以上就是Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng)與同步實(shí)踐的詳細(xì)內(nèi)容,更多請(qǐng)關(guān)注php中文網(wǎng)其它相關(guān)文章!
編程怎么學(xué)習(xí)?編程怎么入門(mén)?編程在哪學(xué)?編程怎么學(xué)才快?不用擔(dān)心,這里為大家提供了編程速學(xué)教程(入門(mén)課程),有需要的小伙伴保存下載就能學(xué)習(xí)啦!
微信掃碼
關(guān)注PHP中文網(wǎng)服務(wù)號(hào)
QQ掃碼
加入技術(shù)交流群
Copyright 2014-2025 http://ipnx.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號(hào)