Goのsyncプリミティブと並行プログラミング
Ethan Miller
Product Engineer · Leapcell

はじめに
Goプログラミングの活気ある世界では、並行処理はファーストクラスの市民です。Goのゴルーチンとチャネルは、並行コードを書くための強力なメカニズムを提供し、複数のタスクを同時に管理しやすくします。しかし、並行処理には共有リソースを管理するという必然的な課題が伴います。複数のゴルーチンが同時に同じデータにアクセスして変更すると、データ競合や一貫性のない状態が発生し、予測不可能でデバッグが困難な動作につながる可能性があります。ここで同期プリミティブが不可欠になります。Goのsync
パッケージは、ゴルーチン間の相互作用を調整することにより、開発者が安全で効率的な並行アプリケーションを書くことを可能にする基本的なツールセット(Mutex
、RWMutex
、WaitGroup
、Cond
)を提供します。これらのプリミティブとそれらの適切な使用法を理解することは、堅牢でパフォーマンスの高いGoプログラムを構築するために不可欠です。この記事では、これらの不可欠な同期ツールの原理、実装、および実践的なアプリケーションを探ります。
主要な同期の概念
各sync
プリミティブの詳細に入る前に、並行プログラミングにおけるいくつかの主要な概念について共通の理解を確立しましょう。
- 並行処理と並列処理: 並行処理は多くのことを一度に対処することであり、並列処理は多くのことを一度に行うことです。Goは並行処理に優れており、マルチコアプロセッサで並列処理を達成することがよくあります。
- 競合状態: 競合状態は、複数のゴルーチンが同時に共有データにアクセスして変更しようとし、最終的な結果が操作の非決定的な順序に依存する場合に発生します。
- クリティカルセクション: クリティカルセクションは、共有リソースにアクセスするコードの一部です。競合状態を防ぐために、一度に1つのゴルーチンのみがクリティカルセクション内のコードを実行できるようにする必要があります。
- デッドロック: デッドロックは、2つ以上のゴルーチンが、それらが必要とするリソースを解放するのを互いに待ちながら、無期限にブロックされる状況です。
- ライブロック: デッドロックに似ていますが、ゴルーチンはブロックされていません。代わりに、互いに応答して状態を絶えず変更し、有用な作業が行われない結果になります。
Goのsync
プリミティブの理解
Goのsync
パッケージは、並行アクセスと調整を管理するためのいくつかの主要なプリミティブを提供します。
Mutex:相互排他ロック
A sync.Mutex
は、複数のゴルーチンによる同時アクセスから共有リソースを保護するように設計された相互排他ロックです。一度に1つのゴルーチンのみがロックを取得し、クリティカルセクションに入ることができます。
原理と実装:
A Mutex
にはLock()
とUnlock()
の2つの主要なメソッドがあります。
Lock()
: ロックを取得します。ロックが既に別のゴルーチンによって保持されている場合、呼び出し元のゴルーチンはロックを取得できるようになるまでブロックされます。Unlock()
: ロックを解除します。クリティカルセクションを終了するときにUnlock()
を呼び出すことが重要です。通常、パニックが発生した場合でも常に解放されるようにdefer
を使用します。
内部的には、Mutex
は、ロックされているかどうか、およびどのゴルーチンが待機しているかを追跡する内部状態を使用して実装されています。目的を達成するために、アトミック操作と、場合によってはシステムコールを活用します。
アプリケーションシナリオ: 複数のゴルーチンが共有カウンターを更新する必要があるシナリオを考えてみてください。
package main import ( "fmt" "sync" "time" ) func main() { var counter int var mu sync.Mutex var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() mu.Lock() // ロックを取得 counter++ mu.Unlock() // ロックを解除 }() } wg.Wait() fmt.Printf("Final counter value: %d\n", counter) // 1000になるはずです }
Mutex
なしでは、競合状態のためにcounter
は1000未満の値になる可能性が高いです。Mutex
は、counter
に対する各インクリメント操作がアトミックで安全であることを保証します。
RWMutex:読み書き相互排他ロック
A sync.RWMutex
は、読み書き相互排他ロックです。書き込み操作よりも読み取り操作がはるかに頻繁に行われる場合に特に役立つ、書き込み処理の排他的アクセスを保証しながら、複数の読み取り処理がリソースに同時にアクセスできるようにすることで、標準のMutex
よりも詳細な制御を提供します。
原理と実装:
RWMutex
には、読み取りロックと書き込みロックの両方のメソッドがあります。
- 書き込みロック:
Lock()
とUnlock()
。標準のMutex
のように動作します。一度に1つのゴルーチンのみが書き込みロックを保持でき、保持されている間は、他の読み取り処理や他の書き込み処理はそれぞれのロックを取得できません。 - 読み取りロック:
RLock()
とRUnlock()
。複数のゴルーチンが同時に読み取りロックを保持できます。しかし、書き込み処理が書き込みロックを保持しているか、または取得を待機している場合、新しい読み取り処理はブロックされます。
内部的には、RWMutex
はアクティブな読み取り処理のカウンターと書き込み処理用のMutex
を維持し、これらの状態に基づいてアクセスを調整します。
アプリケーションシナリオ: データが頻繁に読み取られるが、更新はまれなキャッシュを想像してください。
package main import ( "fmt" "sync" "time" ) type Cache struct { data map[string]string mu sync.RWMutex } func NewCache() *Cache { return &Cache{ data: make(map[string]string), } } func (c *Cache) Get(key string) (string, bool) { c.mu.RLock() // 読み取りロックを取得 defer c.mu.RUnlock() val, ok := c.data[key] return val, ok } func (c *Cache) Set(key, value string) { c.mu.Lock() // 書き込みロックを取得 defer c.mu.Unlock() c.data[key] = value } func main() { cache := NewCache() var wg sync.WaitGroup // 書き込み処理 for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() cache.Set(fmt.Sprintf("key%d", id), fmt.Sprintf("value%d", id)) fmt.Printf("Writer %d set key%d\n", id, id) time.Sleep(100 * time.Millisecond) // 作業をシミュレート }(i) } // 読み取り処理 for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() time.Sleep(50 * time.Millisecond) // 書き込み処理の開始を優先 val, ok := cache.Get("key0") if ok { fmt.Printf("Reader %d got key0: %s\n", id, val) } else { fmt.Printf("Reader %d could not get key0 yet\n", id) } }(i) } wg.Wait() }
この例では、複数のゴルーチンがRLock()
を使用してキャッシュから同時に読み取ることができますが、Set()
操作(書き込みロックを取得する)は、更新中のデータの一貫性を保証しながら、すべての読み取り処理および他の書き込み処理をブロックします。
WaitGroup:ゴルーチンの終了を待機
A sync.WaitGroup
は、ゴルーチンコレクションの終了を待機するために使用されます。メインゴルーチンは、「WaitGroup」内のすべてのゴルーチンが完了するまでブロックされます。
原理と実装:
WaitGroup
には3つの主要なメソッドがあります。
Add(delta int)
:WaitGroup
カウンターにdelta
を追加します。通常、待機するゴルーチンの数に設定されます。複数回呼び出すことができます。Done()
:WaitGroup
カウンターを1つ減らします。これは、各ゴルーチンが作業を終えたときに呼び出されるべきであり、通常はdefer
を使用して行います。Wait()
:WaitGroup
カウンターがゼロになるまで、呼び出し元のゴルーチンをブロックします。
The WaitGroup
は内部カウンターを格納します。Add
はそれを増やし、Done
はそれを減らし、Wait
はそれがゼロになるまでブロックします。
アプリケーションシナリオ:
これらは、いくつかの独立したゴルーチンを起動し、続行する前にすべてが完了するのを待つ必要がある場合によく使用されます。これは、前のMutex
およびRWMutex
の例で示されています。スタンドアロンの例を次に示します。
package main import ( "fmt" "sync" "time" ) func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // 完了したらカウンターをデクリメント fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Duration(id) * 100 * time.Millisecond) // 作業をシミュレート fmt.Printf("Worker %d finished\n", id) } func main() { var wg sync.WaitGroup for i := 1; i <= 3; i++ { wg.Add(1) // 各ワーカーのカウンターをインクリメント go worker(i, &wg) } wg.Wait() // すべてのワーカーがDone()を呼び出すまでブロック fmt.Println("All workers have completed.") }
これは、「All workers have completed.」がworker 1
、worker 2
、およびworker 3
がタスクを完了した後にのみ表示されることを保証します。
Cond:条件変数
A sync.Cond
(条件変数)は、ゴルーチンが特定の条件が満たされるまで待機できるようにします。これは常にsync.Locker
(通常はsync.Mutex
)に関連付けられており、条件自体を保護します。
原理と実装:
Cond
には3つの主要なメソッドがあります。
Wait()
: 関連付けられたロッキングをアトミックに解除し、ゴルーチンが「シグナル」または「ブロードキャスト」されるまでブロックし、その後、戻る前にロッキングを再ロックします。ロッキングが保持された状態で呼び出す必要があります。Signal()
:Cond
で待機しているゴルーチンを最大1つウェイクアップします。待機しているゴルーチンがない場合、何も行いません。Broadcast()
:Cond
で待機しているすべてのゴルーチンをウェイクアップします。
Cond
は、ゴルーチンが単純なロック解除だけではない特定の状態変化を待機する必要がある場合によく使用されます。ロッキングにより、条件チェックがアトミックになり、条件を定義する共有データが保護されます。
アプリケーションシナリオ: バッファに制限された容量があるプロデューサー・コンシューマーの問題を考えてみましょう。プロデューサーはバッファがいっぱいの場合に待機する必要があり、コンシューマーはバッファが空の場合に待機する必要があります。
package main import ( "fmt" "sync" "time" ) const ( bufferCapacity = 5 ) type Buffer struct { items []int mu sync.Mutex notFull *sync.Cond // アイテムが追加されたらシグナル notEmpty *sync.Cond // アイテムが削除されたらシグナル } func NewBuffer() *Buffer { b := &Buffer{ items: make([]int, 0, bufferCapacity), } b.notFull = sync.NewCond(&b.mu) b.notEmpty = sync.NewCond(&b.mu) return b } func (b *Buffer) Produce(item int) { b.mu.Lock() defer b.mu.Unlock() for len(b.items) == bufferCapacity { // バッファがいっぱいの場合は待機 // Waitはアトミックに解除、ブロック、再ロックします fmt.Println("Buffer full, producer waiting...") b.notFull.Wait() } b.items = append(b.items, item) fmt.Printf("Produced: %d, Buffer: %v\n", item, b.items) b.notEmpty.Signal() // コンシューマーにバッファが空ではないことをシグナル } func (b *Buffer) Consume() int { b.mu.Lock() defer b.mu.Unlock() for len(b.items) == 0 { // バッファが空の場合は待機 // Waitはアトミックに解除、ブロック、再ロックします fmt.Println("Buffer empty, consumer waiting...") b.notEmpty.Wait() } item := b.items[0] b.items = b.items[1:] fmt.Printf("Consumed: %d, Buffer: %v\n", item, b.items) b.notFull.Signal() // プロデューサーにバッファが満杯ではないことをシグナル return item } func main() { buf := NewBuffer() var wg sync.WaitGroup // プロデューサー for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { time.Sleep(time.Duration(id*50+j*10) * time.Millisecond) // 作業をシミュレート buf.Produce(id*100 + j) } }(i) } // コンシューマー for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 7; j++ { // 生産量よりも多く消費して待機を示す time.Sleep(time.Duration(id*70+j*15) * time.Millisecond) // 作業をシミュレート buf.Consume() } }(i) } wg.Wait() fmt.Println("All production and consumption complete.") }
この例では、notFull
とnotEmpty
はCond
変数です。プロデューサーはバッファが空のときにnotFull
で待機し、コンシューマーはバッファが空のときにnotEmpty
で待機します。アイテムが追加されると、notEmpty.Signal()
が待機中のコンシューマーをウェイクアップします。アイテムが削除されると、notFull.Signal()
が待機中のプロデューサーをウェイクアップします。Wait()
の周りのfor
ループは、ゴルーチンが誤ってウェイクアップされたり、ロックを再確保する時点で条件が変更されたりする可能性があるため、重要です。
結論
The sync
packageは、Goでの並行処理の管理に不可欠なツールを提供します。Mutex
は共有リソースへの排他的アクセスを保証し、クリティカルセクションでのデータ競合を防ぎます。RWMutex
は、同時読み取りを許可することで、読み取り負荷の高いワークロードに対してより最適化されたアプローチを提供します。WaitGroup
は、一連のゴルーチンの実行完了を待機するタスクを簡素化します。最後に、Cond
はゴルーチンが複雑な条件の充足を待機できるようにし、洗練されたゴルーチン間調整を促進します。これらのプリミティブを習得することは、堅牢で効率的で信頼性の高いGoでの並行アプリケーションを記述するための基本であり、データの整合性と予測可能なプログラム動作を保証します。