亚洲国产日韩欧美一区二区三区,精品亚洲国产成人av在线,国产99视频精品免视看7,99国产精品久久久久久久成人热,欧美日韩亚洲国产综合乱

搜索

Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng)與同步實(shí)踐

心靈之曲
發(fā)布: 2025-10-15 10:58:13
原創(chuàng)
713人瀏覽過(guò)

Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng)與同步實(shí)踐

本文深入探討了go語(yǔ)言并發(fā)編程中常見(jiàn)的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題,并提供了一套健壯的解決方案。通過(guò)一個(gè)字符計(jì)數(shù)示例,我們分析了共享狀態(tài)、指針傳遞以及同步機(jī)制可能引發(fā)的錯(cuò)誤,并展示了如何利用局部變量、數(shù)據(jù)復(fù)制和`sync.waitgroup`等go語(yǔ)言特性,構(gòu)建出高效且結(jié)果一致的并發(fā)程序,同時(shí)強(qiáng)調(diào)了使用go競(jìng)態(tài)檢測(cè)工具的重要性。

Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng):一個(gè)字符計(jì)數(shù)案例分析

在Go語(yǔ)言中,通過(guò)goroutine和channel實(shí)現(xiàn)并發(fā)是其核心優(yōu)勢(shì)之一。然而,不當(dāng)?shù)牟l(fā)設(shè)計(jì),尤其是在處理共享數(shù)據(jù)時(shí),極易引入數(shù)據(jù)競(jìng)爭(zhēng)(data race),導(dǎo)致程序行為不確定,輸出結(jié)果不一致。本教程將通過(guò)一個(gè)具體的字符計(jì)數(shù)示例,詳細(xì)分析數(shù)據(jù)競(jìng)爭(zhēng)產(chǎn)生的原因,并提供一套專(zhuān)業(yè)的解決方案。

問(wèn)題背景:多核CPU下結(jié)果不一致的并發(fā)計(jì)數(shù)

設(shè)想一個(gè)場(chǎng)景:我們需要統(tǒng)計(jì)大量DNA序列中特定字符(如'A', 'T', 'G', 'C')的出現(xiàn)次數(shù)。為了加速處理,我們采用Go語(yǔ)言的并發(fā)機(jī)制,創(chuàng)建多個(gè)Worker goroutine并行處理輸入的DNA字符串,并通過(guò)channel匯總結(jié)果。然而,在多核CPU環(huán)境下運(yùn)行程序時(shí),我們發(fā)現(xiàn)最終的字符計(jì)數(shù)結(jié)果并不穩(wěn)定,每次運(yùn)行都可能得到不同的值,而在單核環(huán)境下卻能保持一致。這通常是數(shù)據(jù)競(jìng)爭(zhēng)的典型表現(xiàn)。

原始設(shè)計(jì)中,程序的關(guān)鍵組件包括:

  1. SpawnWork goroutine:負(fù)責(zé)生成DNA字符串?dāng)?shù)據(jù),并通過(guò)inStr channel發(fā)送給Worker。
  2. Worker goroutine:從inStr channel接收字符串,遍歷并統(tǒng)計(jì)其中'A'/'T'和'G'/'C'的出現(xiàn)次數(shù),然后將結(jié)果通過(guò)resA和resB channel發(fā)送出去。
  3. main goroutine:負(fù)責(zé)創(chuàng)建Worker,啟動(dòng)SpawnWork,并從resA和resB channel接收并累加所有Worker的計(jì)數(shù)結(jié)果,最后打印總和。

導(dǎo)致數(shù)據(jù)競(jìng)爭(zhēng)的根本原因

經(jīng)過(guò)分析,原始設(shè)計(jì)中存在以下幾個(gè)主要的數(shù)據(jù)競(jìng)爭(zhēng)點(diǎn):

立即進(jìn)入豆包AI人工智官網(wǎng)入口”;

立即學(xué)習(xí)豆包AI人工智能在線問(wèn)答入口”;

  1. 共享的全局/包級(jí)變量 at 和 gc: 在Worker函數(shù)內(nèi)部,用于累加'A'/'T'和'G'/'C'計(jì)數(shù)的變量at和gc被聲明為全局或包級(jí)變量。這意味著所有Worker goroutine都在并發(fā)地讀寫(xiě)這兩個(gè)共享變量,而沒(méi)有采取任何同步措施。當(dāng)多個(gè)goroutine同時(shí)嘗試修改這些變量時(shí),就會(huì)發(fā)生數(shù)據(jù)競(jìng)爭(zhēng),導(dǎo)致計(jì)數(shù)結(jié)果錯(cuò)誤或丟失。

    // 原始代碼片段(簡(jiǎn)化)
    var at int // 全局或包級(jí)變量
    var gc int // 全局或包級(jí)變量
    func Worker(...) {
        // ...
        for {
            // ...
            at++ // 多個(gè)goroutine并發(fā)修改
            gc++ // 多個(gè)goroutine并發(fā)修改
            // ...
        }
    }
    登錄后復(fù)制
  2. 通過(guò)Channel傳遞指針 *int 和 *[]byte: 盡管channel本身是并發(fā)安全的,但它傳遞的是數(shù)據(jù)的副本。當(dāng)傳遞指針時(shí),channel傳遞的是指針的副本,而不是指針?biāo)赶虻臄?shù)據(jù)的副本。這意味著多個(gè)goroutine可能持有同一個(gè)指針,并并發(fā)地訪問(wèn)或修改其指向的底層數(shù)據(jù)。

    • resA <- &at 和 resB <- &gc:由于at和gc是共享變量,將它們的地址發(fā)送到channel,并不能解決共享變量的競(jìng)態(tài)問(wèn)題。主goroutine接收到的*int指針仍然指向同一個(gè)共享內(nèi)存位置,而該位置在Worker goroutine中仍在被并發(fā)修改。
    • inStr <- &s:在SpawnWork中,將scanner.Bytes()返回的[]byte切片的地址發(fā)送到inStr channel。[]byte是一個(gè)引用類(lèi)型,其底層是一個(gè)數(shù)組。如果scanner.Bytes()返回的切片底層數(shù)組在發(fā)送后被scanner內(nèi)部復(fù)用或修改,而Worker goroutine尚未處理完,那么Worker可能會(huì)讀取到不正確的數(shù)據(jù)。
  3. 不健壯的同步機(jī)制: 原始代碼使用了一個(gè)基于CpuCnt倒計(jì)數(shù)的select循環(huán)和goto語(yǔ)句來(lái)判斷所有Worker是否完成。這種手動(dòng)管理goroutine生命周期的方式容易出錯(cuò),且不如Go標(biāo)準(zhǔn)庫(kù)提供的sync.WaitGroup直觀和安全。例如,如果SpawnWork在所有Worker處理完所有數(shù)據(jù)之前關(guān)閉了inStr,或者Worker在發(fā)送完所有結(jié)果之前就退出了,都可能導(dǎo)致數(shù)據(jù)丟失或程序提前終止。

解決方案與最佳實(shí)踐

為了解決上述數(shù)據(jù)競(jìng)爭(zhēng)和同步問(wèn)題,我們采取了以下改進(jìn)措施:

豆包AI編程
豆包AI編程

豆包推出的AI編程助手

豆包AI編程483
查看詳情 豆包AI編程
  1. 消除共享狀態(tài),使計(jì)數(shù)器局部化: 將at和gc變量聲明在Worker函數(shù)的循環(huán)內(nèi)部,使其成為每個(gè)Worker處理每個(gè)字符串時(shí)的局部變量。這樣,每個(gè)Worker都有自己獨(dú)立的計(jì)數(shù)器,在處理完一個(gè)字符串后,將發(fā)送到結(jié)果channel。

    func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) {
        defer wg.Done() // 確保goroutine完成時(shí)通知WaitGroup
        for ch := range inCh { // 遍歷channel,直到它被關(guān)閉
            at := 0 // 局部變量,每個(gè)字符串處理一次
            gc := 0 // 局部變量
            for i := 0; i < len(ch); i++ {
                if ch[i] == 'A' || ch[i] == 'T' {
                    at++
                } else if ch[i] == 'G' || ch[i] == 'C' {
                    gc++
                }
            }
            resA <- at // 發(fā)送值,而不是指針
            resB <- gc // 發(fā)送值
        }
    }
    登錄后復(fù)制
  2. 通過(guò)Channel傳遞值類(lèi)型或數(shù)據(jù)副本:

    • 對(duì)于計(jì)數(shù)結(jié)果,直接發(fā)送int類(lèi)型的值(resA <- at),而不是*int。這樣可以確保每個(gè)Worker發(fā)送的是其局部計(jì)數(shù)器的最終值,避免了共享指針的問(wèn)題。
    • 對(duì)于輸入的DNA字符串,雖然[]byte是引用類(lèi)型,但為了避免SpawnWork中scanner.Bytes()底層數(shù)組復(fù)用導(dǎo)致的競(jìng)態(tài),我們對(duì)每個(gè)切片進(jìn)行了深拷貝 (s_copy := append([]byte(nil), s...))。這樣,每個(gè)Worker接收到的都是一個(gè)獨(dú)立的切片副本,可以安全地進(jìn)行處理而不會(huì)影響其他goroutine或原始數(shù)據(jù)。
      func SpawnWork(inStr chan<- []byte) {
      // ...
      for scanner.Scan() {
          s := scanner.Bytes()
          // ...
          s_copy := append([]byte(nil), s...) // 深拷貝切片
          inStr <- s_copy // 發(fā)送切片副本
      }
      close(inStr) // 完成后關(guān)閉輸入channel
      }
      登錄后復(fù)制
  3. 使用 sync.WaitGroup 進(jìn)行健壯的同步:sync.WaitGroup是Go語(yǔ)言中用于等待一組goroutine完成的標(biāo)準(zhǔn)且推薦的機(jī)制。

    • 在啟動(dòng)每個(gè)Worker goroutine之前,調(diào)用wg.Add(1)來(lái)增加計(jì)數(shù)器。

    • 在每個(gè)Worker goroutine的defer語(yǔ)句中調(diào)用wg.Done(),確保無(wú)論goroutine如何退出(正常完成或發(fā)生panic),計(jì)數(shù)器都會(huì)被遞減。

    • 在main goroutine中,創(chuàng)建一個(gè)新的goroutine來(lái)執(zhí)行SpawnWork,并在其內(nèi)部調(diào)用wg.Wait()。這確保了SpawnWork在所有Worker完成其工作后才關(guān)閉結(jié)果channel。

    • main goroutine通過(guò)for range resChA循環(huán)接收結(jié)果,當(dāng)resChA被關(guān)閉時(shí),循環(huán)會(huì)自動(dòng)結(jié)束。

      func main() {
      // ...
      var wg sync.WaitGroup
      for i := 0; i < CpuCnt; i++ {
          wg.Add(1) // 增加WaitGroup計(jì)數(shù)器
          go Worker(inStr, resChA, resChB, &wg)
      }
      
      go func() {
          SpawnWork(inStr) // 啟動(dòng)工作生成器
          wg.Wait() // 等待所有Worker完成
          close(resChA) // 關(guān)閉結(jié)果channel
          close(resChB) // 關(guān)閉結(jié)果channel
      }()
      
      A := 0
      B := 0
      // 使用for range安全地接收結(jié)果,直到channel關(guān)閉
      for tmp_at := range resChA {
          tmp_gc := <-resChB
          A += tmp_at
          B += tmp_gc
          // ...
      }
      // ...
      }
      登錄后復(fù)制

完整的修正代碼示例

package main

import (
    "bufio"
    "fmt"
    "runtime"
    "strings"
    "sync"
)

// Worker goroutine負(fù)責(zé)處理字符串并計(jì)數(shù)
func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) {
    defer wg.Done() // 確保Worker完成時(shí)通知WaitGroup
    // fmt.Println("Worker started...") // 可用于調(diào)試
    for ch := range inCh { // 從輸入channel接收字符串,直到channel關(guān)閉
        at := 0 // 局部變量,用于統(tǒng)計(jì)當(dāng)前字符串的A/T計(jì)數(shù)
        gc := 0 // 局部變量,用于統(tǒng)計(jì)當(dāng)前字符串的G/C計(jì)數(shù)
        for i := 0; i < len(ch); i++ {
            if ch[i] == 'A' || ch[i] == 'T' {
                at++
            } else if ch[i] == 'G' || ch[i] == 'C' {
                gc++
            }
        }
        resA <- at // 將局部計(jì)數(shù)結(jié)果發(fā)送到結(jié)果channel
        resB <- gc
    }
}

// SpawnWork goroutine負(fù)責(zé)生成工作(DNA字符串)
func SpawnWork(inStr chan<- []byte) {
    // fmt.Println("Spawning work:") // 可用于調(diào)試
    // 人工輸入數(shù)據(jù),為了演示目的進(jìn)行擴(kuò)展
    StringData :=
        "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
            "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
            "CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA\n" +
            "TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC\n" +
            "AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT\n" +
            "TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG\n" +
            "TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
            "NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA\n"
    // 擴(kuò)展數(shù)據(jù)1000次,以增加處理量
    tmp := StringData
    for n := 0; n < 1000; n++ {
        StringData = StringData + tmp
    }
    scanner := bufio.NewScanner(strings.NewReader(StringData))
    scanner.Split(bufio.ScanLines)

    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == '>' {
            continue
        } else {
            // 對(duì)切片進(jìn)行深拷貝,確保每個(gè)Worker處理的是獨(dú)立的數(shù)據(jù)副本
            s_copy := append([]byte(nil), s...)
            inStr <- s_copy
        }
    }
    close(inStr) // 所有數(shù)據(jù)發(fā)送完畢后,關(guān)閉輸入channel
}

func main() {
    CpuCnt := runtime.NumCPU() // 獲取CPU核心數(shù)
    runtime.GOMAXPROCS(CpuCnt) // 設(shè)置Go調(diào)度器使用與CPU核心數(shù)相同的邏輯處理器
    fmt.Printf("Processors: %d\n", CpuCnt)

    resChA := make(chan int)  // 用于接收A/T計(jì)數(shù)的channel
    resChB := make(chan int)  // 用于接收G/C計(jì)數(shù)的channel
    inStr := make(chan []byte) // 用于發(fā)送DNA字符串的channel

    fmt.Println("Spawning workers:")
    var wg sync.WaitGroup // 初始化WaitGroup
    for i := 0; i < CpuCnt; i++ {
        wg.Add(1) // 每啟動(dòng)一個(gè)Worker,WaitGroup計(jì)數(shù)器加1
        go Worker(inStr, resChA, resChB, &wg)
    }

    fmt.Println("Spawning work:")
    // 啟動(dòng)一個(gè)goroutine來(lái)生成工作并等待所有Worker完成
    go func() {
        SpawnWork(inStr) // 啟動(dòng)工作生成器
        wg.Wait()        // 等待所有Worker goroutine完成
        close(resChA)    // 所有Worker完成后,關(guān)閉結(jié)果channelA
        close(resChB)    // 所有Worker完成后,關(guān)閉結(jié)果channelB
    }()

    A := 0        // 總A/T計(jì)數(shù)
    B := 0        // 總G/C計(jì)數(shù)
    LineCnt := 0  // 處理的行數(shù)
    // 使用for range循環(huán)接收結(jié)果,當(dāng)resChA關(guān)閉時(shí),循環(huán)會(huì)自動(dòng)退出
    for tmp_at := range resChA {
        tmp_gc := <-resChB // resChA和resChB的結(jié)果是成對(duì)出現(xiàn)的
        A += tmp_at
        B += tmp_gc
        LineCnt++
    }

    if !(A+B > 0) {
        fmt.Println("No A/B was found!")
    } else {
        ABFraction := float32(B) / float32(A+B)
        fmt.Println("\n----------------------------")
        fmt.Printf("Cpu's  : %d\n", CpuCnt)
        fmt.Printf("Lines  : %d\n", LineCnt)
        fmt.Printf("A+B    : %d\n", A+B)
        fmt.Printf("A      : %d\n", A)
        fmt.Printf("B      : %d\n", B) // 修正:此處應(yīng)打印B的值,而不是A
        fmt.Printf("AB frac: %.2f%%\n", ABFraction*100)
        fmt.Println("----------------------------")
    }
}
登錄后復(fù)制

注意事項(xiàng)與總結(jié)

  1. 利用Go競(jìng)態(tài)檢測(cè)器: 在開(kāi)發(fā)和調(diào)試并發(fā)程序時(shí),務(wù)必使用Go的競(jìng)態(tài)檢測(cè)器。通過(guò)在編譯或運(yùn)行命令中添加-race標(biāo)志(例如 go run -race main.go 或 go build -race && ./your_program),可以幫助你發(fā)現(xiàn)潛在的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題。
  2. 避免共享可變狀態(tài): 這是并發(fā)編程中的黃金法則。盡量使goroutine之間的數(shù)據(jù)獨(dú)立,或者通過(guò)channel傳遞數(shù)據(jù)的副本,而不是共享引用。如果必須共享狀態(tài),請(qǐng)使用sync包提供的互斥鎖(sync.Mutex)或其他同步原語(yǔ)來(lái)保護(hù)對(duì)共享數(shù)據(jù)的訪問(wèn)。
  3. 理解引用類(lèi)型和值類(lèi)型: Go中的切片、映射和channel是引用類(lèi)型。當(dāng)通過(guò)channel傳遞它們時(shí),傳遞的是其引用,而不是底層數(shù)據(jù)的副本。如果需要在不同goroutine中獨(dú)立修改這些數(shù)據(jù),必須進(jìn)行深拷貝。
  4. 使用 sync.WaitGroup 管理Goroutine生命周期: sync.WaitGroup提供了一種簡(jiǎn)潔高效的方式來(lái)等待一組goroutine完成。它比手動(dòng)計(jì)數(shù)或復(fù)雜的select邏輯更健壯、更易于理解和維護(hù)。
  5. Channel的關(guān)閉: 正確關(guān)閉channel對(duì)于通知接收方數(shù)據(jù)流結(jié)束至關(guān)重要。通常,發(fā)送方負(fù)責(zé)關(guān)閉channel。在有多個(gè)發(fā)送方或復(fù)雜邏輯的情況下,可以考慮使用sync.Once來(lái)確保channel只被關(guān)閉一次,或者像本例中通過(guò)WaitGroup確保所有生產(chǎn)者完成后再關(guān)閉。

通過(guò)遵循這些最佳實(shí)踐,開(kāi)發(fā)者可以有效地避免Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng),構(gòu)建出穩(wěn)定、高效且可預(yù)測(cè)的并發(fā)應(yīng)用程序。

以上就是Go并發(fā)編程中的數(shù)據(jù)競(jìng)爭(zhēng)與同步實(shí)踐的詳細(xì)內(nèi)容,更多請(qǐng)關(guān)注php中文網(wǎng)其它相關(guān)文章!

編程速學(xué)教程(入門(mén)課程)
編程速學(xué)教程(入門(mén)課程)

編程怎么學(xué)習(xí)?編程怎么入門(mén)?編程在哪學(xué)?編程怎么學(xué)才快?不用擔(dān)心,這里為大家提供了編程速學(xué)教程(入門(mén)課程),有需要的小伙伴保存下載就能學(xué)習(xí)啦!

下載
來(lái)源:php中文網(wǎng)
本文內(nèi)容由網(wǎng)友自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,本站不承擔(dān)相應(yīng)法律責(zé)任。如您發(fā)現(xiàn)有涉嫌抄襲侵權(quán)的內(nèi)容,請(qǐng)聯(lián)系admin@php.cn
最新問(wèn)題
開(kāi)源免費(fèi)商場(chǎng)系統(tǒng)廣告
最新下載
更多>
網(wǎng)站特效
網(wǎng)站源碼
網(wǎng)站素材
前端模板
關(guān)于我們 免責(zé)申明 意見(jiàn)反饋 講師合作 廣告合作 最新更新
php中文網(wǎng):公益在線php培訓(xùn),幫助PHP學(xué)習(xí)者快速成長(zhǎng)!
關(guān)注服務(wù)號(hào) 技術(shù)交流群
PHP中文網(wǎng)訂閱號(hào)
每天精選資源文章推送
PHP中文網(wǎng)APP
隨時(shí)隨地碎片化學(xué)習(xí)
PHP中文網(wǎng)抖音號(hào)
發(fā)現(xiàn)有趣的

Copyright 2014-2025 http://ipnx.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號(hào)