Goルーチンとチャネル:モダンな並行処理パターン
Min-jun Kim
Dev Intern · Leapcell

Goがクラウドやモダンな並行システムのために作られた言語としての評判は、そのエレガントで強力な並行処理のアプローチ、すなわちGoroutinesとChannelsに大きく負うところがあります。アプリケーションが高応答性、スケーラビリティ、効率的なリソース利用を要求する時代において、これらのプリミティブをどのように活用するかを理解することは、単なる「あれば嬉しい」ものではなく、あらゆるGo開発者にとっての基本的なスキルです。この記事では、Goの並行処理モデルの背後にある魔法を、その基盤となるブロックから始め、Fan-in、Fan-out、Worker Poolsのような実用的な現実世界のパターンへと構築していきます。最後までには、Goで堅牢な並行アプリケーションを設計・実装し、複雑な問題をシンプルかつ効果的に解決する方法についての明確な理解を得ているでしょう。
Goの並行処理モデルの中心には、2つの共生的な構造があります:GoroutinesとChannels。
Goroutines:軽量な並行実行
Goroutineは、同じアドレス空間内で他のGoroutinesと並行して実行される、軽量で独立して実行される関数です。従来のOSスレッドとは異なり、GoroutinesはGoランタイムによって少数のOSスレッドに多重化されるため、作成と管理が非常に安価です。これは、顕著なオーバーヘッドなしに数千、あるいは数百万ものGoroutinesを起動できることを意味し、高度な並行アプリケーションを可能にします。
Goroutineを起動するには、単にgo
キーワードとそれに続く関数呼び出しを使用します。
package main import ( "fmt" "time" ) func sayHello(name string) { time.Sleep(100 * time.Millisecond) // いくらかの作業をシミュレート fmt.Printf("Hello, %s!\n", name) } func main() { go sayHello("Alice") // Goroutineを起動 fmt.Println("Main function continues execution...") // メイン関数は待機する必要があります。さもないと、プログラムがGoroutine完了前に終了する可能性があります。 time.Sleep(200 * time.Millisecond) }
この例では、sayHello("Alice")
はmain関数と並行して実行されます。main関数内のtime.Sleep
に注意してください。これがないと、sayHello
が実行される機会を得る前にmainが終了する可能性があり、Goroutinesがノンブロッキングであることを示しています。
Channels:逐次プロセス間の通信
Goroutinesが実行を処理する一方で、ChannelsはGoroutines間の通信を処理します。Goの哲学である「メモリを共有して通信するのではなく、通信によってメモリを共有する」はChannelsによって具現化されています。チャネルは、値の送受信が可能な型付きの導管です。
チャネルはバッファなしまたはバッファ付きにできます。
- バッファなしチャネル:バッファなしチャネルへの送信操作は、受信操作が準備できるまでブロックし、その逆も同様です。これにより同期通信が保証されます。
- バッファ付きチャネル:バッファ付きチャネルには容量があります。送信操作はバッファがいっぱいの場合にのみブロックし、受信操作はバッファが空の場合にのみブロックします。
チャネルの使用方法を以下に示します。
package main import ( "fmt" "time" ) func producer(ch chan int) { for i := 0; i < 5; i++ { fmt.Printf("Producer: Sending %d\n", i) ch <- i // チャネルに値を送信 time.Sleep(50 * time.Millisecond) } close(ch) // 完了したらチャネルを閉じる } func consumer(ch chan int) { for val := range ch { // チャネルから値を受信 fmt.Printf("Consumer: Received %d\n", val) } fmt.Println("Consumer: Channel closed, exiting.") } func main() { // バッファなしチャネルを作成 messages := make(chan int) go producer(messages) go consumer(messages) // Goroutineが完了するまでmainをアクティブに保つ time.Sleep(500 * time.Millisecond) }
この例では、producer
Goroutineがmessages
チャネルに整数を送信し、consumer
Goroutineがそれらを受信します。チャネル上のfor...range
ループは、チャネルが閉じられるまで値を使用します。
次に、GoroutinesとChannels上に構築された強力な並行処理パターンを探りましょう。
モダンな並行処理パターン
Fan-out:作業の分散
Fan-outは、単一の作業ソースが複数のワーカーGoroutinesにタスクを分散するパターンです。これはCPUバウンドまたはI/Oバウンドな操作を並列化するのに非常に役立ちます。通常、単一の入力チャネルと、そこから読み取る複数のワーカーGoroutinesを使用します。
package main import ( "fmt" "sync" "time" ) // workerは数値を処理し、計算をシミュレートします func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("Worker %d: processing job %d\n", id, j) time.Sleep(100 * time.Millisecond) // 作業をシミュレート results <- j * 2 // 結果を送信 } } func main() { const numJobs = 10 const numWorkers = 3 jobs := make(chan int, numJobs) results := make(chan int, numJobs) // ワーカーGoroutinesを開始 for w := 1; w <= numWorkers; w++ { go worker(w, jobs, results) } // ジョブの送信 for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 送信するジョブはもうありません // WaitGroupを使用して結果を収集し、すべてのワーカーが完了することを保証します var wg sync.WaitGroup wg.Add(numJobs) // すべてのジョブが完了するのを待つように設定(本来はワーカーごとだが、ここでは結果収集を単純化) // 結果を収集する goroutine (numJobs 個の結果を読み取るだけ) go func() { for i := 0; i < numJobs; i++ { <-results // 結果を単純に消費 } // ここでは、すべての結果が収集されたことを確認するために、単純に numJobs 回読み取っています。 // より複雑なシナリオでは、結果の収集完了を通知する別のメカニズムが必要になる場合があります。 }() // 単純なデモンストレーションのため、結果が収集されるための十分な時間を確保します。 time.Sleep(time.Duration(numJobs/numWorkers)*150*time.Millisecond + 200*time.Millisecond) fmt.Println("Finished processing all jobs.") }
このFan-out
例では、main
がjobs
チャネルにジョブをプッシュします。複数のworker
Goroutinesは並行してjobs
から読み取り、処理し、results
チャネルに結果を戻します。
Fan-in:結果の統合
Fan-inはFan-outの逆で、複数のソースが単一のチャネルにデータを送信し、データストリームを統合します。これは、複数の並列計算からの結果を収集するためによく使用されます。
package main import ( "fmt" "sync" "time" ) // dataSourceは異なるソースからのデータの取得をシミュレートします func dataSource(id int, out chan<- string, wg *sync.WaitGroup) { defer wg.Done() time.Sleep(time.Duration(100+id*50) * time.Millisecond) // 異なる取得時間をシミュレート out <- fmt.Sprintf("Data from source %d", id) } func main() { const numSources = 5 results := make(chan string) // すべての結果のための単一チャネル var wg sync.WaitGroup // 複数のデータソースを開始 for i := 1; i <= numSources; i++ { wg.Add(1) go dataSource(i, results, &wg) } // すべてのソースが完了したら結果チャネルを閉じるためのGoroutine go func() { wg.Wait() // すべてのデータソースの完了を待機 close(results) // チャネルを閉じる }() // 単一のfan-inチャネルから結果を収集 fmt.Println("Collecting results:") for r := range results { fmt.Println(r) } fmt.Println("All results collected.") }
ここでは、dataSource
Goroutinesが同じresults
チャネルにデータを送信します。別のGoroutineはsync.WaitGroup
を使用してすべてのdataSource
Goroutinesの完了を待ち、その後results
チャネルを閉じ、main
関数にそれ以上データが到着しないことをシグナルします。
Worker Pools:制御された並行処理
Worker PoolはFan-outとFan-inを組み合わせて、共有キューからタスクを処理する固定数のGoroutines(ワーカー)を作成します。このパターンは、制御された並行処理を提供し、リソースの枯渇を防ぎ、効率的なタスク分散を保証します。多数のタスクがあるが、並行操作の数を制限したいシナリオに最適です。
package main import ( "fmt" "sync" "time" ) // プール用のWorker関数 func workerPoolWorker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("Worker %d starting job %d\n", id, j) time.Sleep(time.Duration(j) * 50 * time.Millisecond) // ジョブIDに基づく作業をシミュレート fmt.Printf("Worker %d finished job %d\n", id, j) results <- j * 2 } } func main() { const numJobs = 10 const numWorkers = 3 // 固定数のワーカー jobs := make(chan int, numJobs) results := make(chan int, numJobs) // ワーカープールを開始:'numWorkers'個のGoroutinesを起動 for w := 1; w <= numWorkers; w++ { go workerPoolWorker(w, jobs, results) } // jobsチャネルにジョブを送信 for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 送信するジョブはもうありません // resultsチャネルから結果を収集 // numJobs個すべての結果を待つ必要があります。 var receivedResults []int for a := 1; a <= numJobs; a++ { res := <-results receivedResults = append(receivedResults, res) } fmt.Println("All results collected:", receivedResults) }
Worker Poolの例では、numWorkers
個のGoroutinesが一度起動され、継続的にjobs
チャネルからジョブを取得します。すべてのジョブが送信され、jobs
が閉じられると、ワーカーは残りのタスクを処理した後、最終的に終了します。main
関数はnumJobs
個の結果を収集するのを待ち、すべての作業が完了したことを保証します。
GoのGoroutinesとChannelsは、強力でありながら直感的な並行処理のアプローチを提供し、スケーラブルで応答性の高いアプリケーションの構築を容易にします。そのコアコンセプトを理解し、Fan-in、Fan-out、Worker Poolsのようなパターンを習得することで、複雑な並行フローを効果的に管理し、より堅牢で効率的なソフトウェアにつながることができます。Goの並行処理モデルは、開発者がパフォーマンスだけでなく、理解可能で保守性の高い並行コードを書くことを真に可能にします。
これらの例は、可能なことの表層をなぞるにすぎません。さらに深く掘り下げると、キャンセルやタイムアウトのためのコンテキスト(context)の洗練された使用法、エラー伝搬パターン、そしてGoroutinesとChannelsの強力な基盤の上に構築された、より高度な同期プリミティブを発見するでしょう。Goの並行処理を受け入れてください。それはゲームチェンジャーです。