syncパッケージ unveiled - WaitGroup: 並行ゴルーチンの完了をオーケストレーションする
James Reed
Infrastructure Engineer · Leapcell

goroutineとチャネルを中心に構築されたGoの並行モデルは、信じられないほど強力かつエレガントです。しかし、力には責任が伴います – これらの並行プロセスを効果的に管理することは、堅牢で信頼性の高いアプリケーションを構築するために不可欠です。並行処理ツールキットの基本的なツールの一つであるsync
パッケージが提供するsync.WaitGroup
です。
sync.WaitGroup
型は、ゴルーチンのコレクションが完了するのを待つように設計されています。これは、インクリメントおよびデクリメントできるカウンターとして機能します。カウンターがゼロになると、Wait
メソッドはブロックを解除します。このシンプルなメカニズムは、複数のゴルーチンを起動し、メインゴルーチン(または他のゴルーチン)が続行する前にそれらがすべて作業を完了したことを確認する必要があるシナリオで非常に役立ちます。
WaitGroupとは? それが解決する問題
アプリケーションが大量のタスクを並行して処理する必要があるシナリオを想像してみてください。各タスクにゴルーチンを起動することにしました。これらのゴルーチンを待機するメカニズムがないと、メインプログラムが早期に終了したり、まだ計算されていない結果を使用しようとしたりする可能性があります。
このナイーブで問題のある例を考えてみましょう。
package main import ( "fmt" "time" ) func processTask(id int) { fmt.Printf("Task %d started\n", id) time.Sleep(time.Duration(id) * 100 * time.Millisecond) // 作業をシミュレート fmt.Printf("Task %d finished\n", id) } func main() { for i := 1; i <= 5; i++ { go processTask(i) } fmt.Println("All tasks launched. Exiting main.") // ここで何が起こるか? mainが終了する前に多くのタスクが完了しない可能性があります。 }
上記のコードを実行すると、おそらくすべての「Task X finished」メッセージが表示されないか、または「Exiting main.」の後に順序付けられていない方法で表示されることが観察されるでしょう。main
ゴルーチンは、processTask
ゴルーチンの完了を待機しません。これはまさにsync.WaitGroup
が対処する問題です。
sync.WaitGroupの仕組み
sync.WaitGroup
は3つの主要なメソッドを公開しています。
Add(delta int)
:WaitGroup
カウンターをdelta
だけインクリメントします。通常、新しいゴルーチンを起動する前にこれを呼び出して、別のゴルーチンがグループに参加していることを示します。delta
が負の場合、カウンターはデクリメントされます。Done()
:WaitGroup
カウンターを1つデクリメントします。これは通常、ゴルーチンの実行の最後に(しばしばdefer
を使用して)呼び出され、その完了をシグナルします。これはAdd(-1)
と同等です。Wait()
:WaitGroup
カウンターがゼロになるまで呼び出し元のゴルーチンをブロックします。これは、Add
されたすべてのゴルーチンがDone
も呼び出したことを意味します。
WaitGroupを正しく実装する
sync.WaitGroup
を使用して、以前の例をリファクタリングしましょう。
package main import ( "fmt" "sync" "time" ) func processTaskWithWG(id int, wg *sync.WaitGroup) { // ゴルーチンを終了する前に、必ずDone()を呼び出します。 // defer は、エラーが発生した場合でも呼び出されることを保証します。 defer wg.Done() fmt.Printf("Task %d started\n", id) time.Sleep(time.Duration(id) * 100 * time.Millisecond) // 作業をシミュレート fmt.Printf("Task %d finished\n", id) } func main() { var wg sync.WaitGroup // WaitGroupを宣言 for i := 1; i <= 5; i++ { wg.Add(1) // 各新しいゴルーチンのためにWaitGroupカウンターをインクリメントします go processTaskWithWG(i, &wg) // WaitGroupをポインタで渡します } // すべてのゴルーチンの完了を待ちます wg.Wait() fmt.Println("All tasks complete. Exiting main.") }
この修正されたコードを実行すると、すべての「Task X finished」メッセージが「All tasks complete. Exiting main.」の前に一貫して表示されるでしょう。main
ゴルーチンは、processTaskWithWG
ゴルーチンがすべて実行を完了するのを正しく待機するようになりました。
重要な考慮事項:
- ポインタ対値:
WaitGroup
は常にポインタ(*sync.WaitGroup
)でゴルーチンに渡してください。値で渡した場合、各ゴルーチンはWaitGroup
のコピーを受け取り、そのDone()
呼び出しはmain
ゴルーチン内の元のWaitGroup
ではなく、ローカルコピーのみをデクリメントします。これはよくある落とし穴です。 go
の前にAdd
: 新しいゴルーチンを開始する前にwg.Add(1)
を呼び出してください。ゴルーチン内でAdd
を呼び出すと、新しいゴルーチンがカウンターをインクリメントする機会を得る前に、main
ゴルーチンがwg.Wait()
を実行してしまう競合状態が発生する可能性があり、Wait()
が早期にブロック解除されてしまいます。defer wg.Done()
:defer
を使用してwg.Done()
を呼び出すと、ゴルーチンがパニックしたり、エラーにより早期にリターンしたりした場合でも、カウンターがデクリメントされることが保証されます。これにより、Wait
メソッドが無限にブロックする(デッドロック)のを防ぐことができます。
より複雑な例:ファンアウト・ファンインパターン
WaitGroup
は、ファンアウト/ファンインパターンを実装するのに優れています。このパターンでは、作業を複数のワーカーに配布(ファンアウト)し、それらの結果を収集(ファンイン)します。WaitGroup
自体は結果を収集しませんが(通常はチャネルが使用されます)、処理に進む前にすべてのワーカーが完了したことを保証します。
複数のURLから同時にデータを取得し、その後すべての応答を処理することを想像してみてください。
package main import ( "fmt" "io/ioutil" "net/http" "sync" "time" ) // fetchDataはURLからデータを取得し、チャネルに送信します func fetchData(url string, results chan<- string, wg *sync.WaitGroup) { defer wg.Done() // WaitGroupカウンターがデクリメントされることを保証します fmt.Printf("Fetching %s...\n", url) resp, err := http.Get(url) if err != nil { results <- fmt.Sprintf("Error fetching %s: %v", url, err) return } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { results <- fmt.Sprintf("Error reading body from %s: %v", url, err) return } results <- fmt.Sprintf("Content from %s (first 50 chars): %s", url, string(body)[:min(50, len(body))]) } func min(a, b int) int { if a < b { return a } return b } func main() { urls := []string{ "http://example.com", "http://google.com", "http://bing.com", "http://invalid-url-example.com", // これはエラーを引き起こします } var wg sync.WaitGroup // 結果を保持するバッファ付きチャネルを作成します。URLの数と同じサイズに設定します。 // 受信側が遅い場合でも、送信側がブロックされるのを防ぎます。 results := make(chan string, len(urls)) fmt.Println("Starting data fetching...") for _, url := range urls { wg.Add(1) // ゴルーチンを起動する前に必ず追加します go fetchData(url, results, &wg) } // すべてのフェッチゴルーチンが完了した後、結果チャネルを閉じるゴルーチンを開始します。 // これは、"results"チャネルのrangeループがいつ終了するかを知るために重要です。 go func() { wg.Wait() // すべてのfetchDataゴルーチンの完了を待ちます close(results) // チャネルを閉じます }() // 結果が届き次第、またはすべて収集された後に処理します。 // チャネルのrangeループを使用すると、すべての結果を処理することが保証されます。 fmt.Println("\nProcessing fetched data:") for res := range results { fmt.Println(res) } fmt.Println("\nAll data processing complete. Exiting main.") time.Sleep(time.Second) // 出力順序を確実にするために少し時間を置きます }
このfan-out/fan-in
例では:
- 各URLに対して
fetchData
ゴルーチンを起動し、wg.Add(1)
を使用してそれぞれを追跡します。 - 各
fetchData
ゴルーチンは、完了時(またはエラー時)にwg.Done()
を呼び出します。 - 別個の匿名ゴルーチンが、
wg.Wait()
を呼び出し、次にclose(results)
を呼び出す責任を負います。チャネルを閉じると、for res := range results
ループにこれ以上値が送信されないことがシグナルされ、正常に終了できるようになります。これがないと、すべてのアイテムが処理された後、main
ゴルーチンのrange results
ループは無期限にブロックされます。 main
ゴルーチンは、results
チャネルを反復処理し、取得された各データ部分を印刷します。
このパターンは、並行データ処理にとって非常に一般的で強力です。
ベストプラクティスと一般的な落とし穴
- ゴルーチンの内部で
Add
しない: 前述のように、生成されたゴルーチン内でwg.Add(1)
を呼び出すと、競合状態が発生する可能性があります。ゴルーチンを生成する前に必ずカウンターをインクリメントしてください。 - 常に
defer wg.Done()
: カウンターがデクリメントされることを保証する最も堅牢な方法です。 - ポインタで渡す:
sync.WaitGroup
は、ゴルーチンにポインタ(*sync.WaitGroup
)で渡す必要があります。 Wait
が呼び出された後にAdd
しない:Wait()
が返された後、WaitGroup
は理論的には再利用できます。ただし、Wait()
が呼び出された後にカウンターにAdd
すると、別のゴルーチンが同じWaitGroup
インスタンスで待機している、または新しく待機している場合に、未定義の動作やパニックにつながる可能性があります。特にWait()
がループで呼び出される場合、各並行タスクのバッチに対して新しいWaitGroup
を作成するのが一般的に安全です。- ゼロ値
WaitGroup
:WaitGroup
は宣言後に直接使用できます(そのゼロ値は使用準備ができています)。sync.WaitGroup{}
で初期化する必要はありません。
結論
sync.WaitGroup
は、Goの並行処理ツールボックスにおいて不可欠なツールです。複数のゴルーチンの完了を調整するシンプルかつ効果的なメカニズムを提供し、早期終了や競合状態のような一般的な並行処理バグを防ぎます。そのAdd
、Done
、Wait
メソッドをマスターし、ベストプラクティスを遵守することで、より堅牢で予測可能でパフォーマンスの高い並行アプリケーションをGoで構築できます。これは、より複雑な並行パターンのバックボーンを形成し、あらゆる真剣なGo開発者にとって基本的なビルディングブロックです。