Goの並行処理パターン — Producer-Consumer、Fan-out/Fan-in、およびパイプラインの詳細
Wenhao Wang
Dev Intern · Leapcell

Goの組み込みの並行処理機能、主にgoroutineとchannelは、非常に並行かつ並列なアプリケーションを記述するための強力かつエレガントな方法を提供します。従来のスレッドベースのモデルとは異なり、Goのアプローチは並行プログラミングを簡素化し、デッドロックや競合状態のような一般的な落とし穴を減らします。この記事では、Goにおけるいくつかのコアな並行処理パターン、すなわちProducer-Consumer、Fan-out/Fan-in、およびパイプラインを掘り下げ、それらの実装と利点を実践的な例で説明します。
基本:goroutineとchannel
パターンに入る前に、ビルディングブロックを簡単に復習しましょう。
- goroutine: 軽量で独立して実行される関数です。これらは少数のOSスレッドに多重化されるため、非常に効率的です。関数呼び出しの前に
go
を付けてgoroutineを起動します:go myFunction()
。 - channel: goroutineが通信および同期できる型付きのコンジットです。これらは、メモリを共有することによって通信する(rather than communicating by sharing memory)「Goらしい」方法です。これらを、並行コンポーネントを接続するパイプと考えてください。
make(chan Type)
でchannelを作成し、ch <- value
で送信、value := <-ch
で受信します。channelはバッファ付き(make(chan Type, capacity)
)またはバッファなし(make(chan Type)
)にできます。
パターン1:Producer-Consumer
Producer-Consumerパターンは、1つ以上の「プロデューサー」がデータを生成して共有バッファに入れ、1つ以上の「コンシューマー」がバッファからデータを取り出して処理する、という古典的な並行設計です。Goでは、channelが自然にこの共有バッファとして機能します。
なぜ使用するのか?
- 分離: プロデューサーはデータがどのように消費されるかを知る必要がなく、コンシューマーはデータがどのように生成されるかを知る必要がありません。
- 負荷平滑化: プロデューサーが不規則なレートでデータを生成する場合、バッファはコンシューマーへのフローを平滑化できます。
- 並行処理: プロデューサーとコンシューマーは並行して動作でき、全体的な処理速度を向上させる可能性があります。
例:バウンドバッファを使用したファイル処理
大きなファイルから行を読み取り(プロデューサー)、各行を処理する(コンシューマー)シナリオを想像してみましょう。
package main import ( "bufio" "fmt" "os" "strconv" "strings" "sync" "time" ) // LineProducerはファイルから行を読み取り、channelに送信します。 func LineProducer(filePath string, lines chan<- string, wg *sync.WaitGroup) { defer wg.Done() file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) close(lines) // エラー時にchannelが閉じられることを保証 return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { lines <- scanner.Text() // channelに行を送信 } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) } close(lines) // 重要:これ以上データが送信されないことを通知するためにchannelを閉じる } // LineConsumerはchannelから受信した行を処理します。 func LineConsumer(id int, lines <-chan string, processedCount *int64, wg *sync.WaitGroup) { defer wg.Done() for line := range lines { // CPU負荷の高い処理をシミュレート time.Sleep(time.Millisecond * 10) num, err := strconv.Atoi(strings.TrimSpace(line)) if err == nil { // fmt.Printf("Consumer %d processed: %d (squared: %d)\n", id, num, num*num) } else { // fmt.Printf("Consumer %d skipped non-integer line: %s\n", id, line) } // ミューテックスまたはアトミック操作を使用して処理済みカウントを安全にインクリメント // 簡単にするため、mainでatomic.AddInt64を使用します } fmt.Printf("Consumer %d finished.\n", id) } func main() { const ( numConsumers = 5 bufferSize = 100 // プロデューサー/コンシューマーのレートを平滑化するためのバッファ付きchannel filePath = "data.txt" ) // デモンストレーションのためにダミーのdata.txtを作成 createDummyFile(filePath, 1000) linesChannel := make(chan string, bufferSize) // バッファ付きchannel var wg sync.WaitGroup var processed int64 // 本番ではアトミックを使用 // プロデューサーを開始 wg.Add(1) go LineProducer(filePath, linesChannel, &wg) // コンシューマーを開始 for i := 0; i < numConsumers; i++ { wg.Add(1) go LineConsumer(i+1, linesChannel, &processed, &wg) } // すべてのgoroutineが終了するのを待つ wg.Wait() fmt.Printf("All producers and consumers finished.\n") } // ダミーファイルを作成するヘルパー関数 func createDummyFile(filePath string, numLines int) { file, err := os.Create(filePath) if err != nil { panic(err) } defer file.Close() writer := bufio.NewWriter(file) for i := 0; i < numLines; i++ { fmt.Fprintf(writer, "%d\n", i) } writer.Flush() fmt.Printf("Created dummy file: %s with %d lines.\n", filePath, numLines) }
この例では:
LineProducer
はプロデューサーであり、行を読み取ってlinesChannel
に送信します。LineConsumer
インスタンスはコンシューマーであり、linesChannel
から行を受信して処理します。linesChannel
はバウンドバッファとして機能します。bufferSize
は、プロデューサーがコンシューマーに先行しすぎるのを防ぎ、メモリを使い果たす可能性を回避します。sync.WaitGroup
は、メインが終了する前にすべてのプロデューサーとコンシューマーが作業を完了するのを待つために不可欠です。LineProducer
でlinesChannel
を閉じることは非常に重要です。これにより、コンシューマーはこれ以上データが送信されないことを通知され、for line := range lines
ループが正常に終了できるようになります。
パターン2:Fan-out / Fan-in
Fan-out / Fan-inパターンは、タスクのセットを複数のワーカーgoroutine(fan-out)に分散し、それらの結果を単一のchannelに収集(fan-in)することです。このパターンは、計算の並列化に優れています。
なぜ使用するのか?
- 並列性: 複数のCPUコアを活用したり、ネットワーク全体にワークを分散したりします。
- スケーラビリティ: 負荷が増加したときに、より多くのワーカーを簡単に追加できます。
- ワーク分散: 大きな問題を、小さく、独立したサブ問題に分解します。
例:数字の並列平方計算
数字のリストがあり、それらを並列に二乗したいとしましょう。
package main import ( "fmt" "sync" "time" ) // workerは'in' channelから数値を受け取り、二乗して、'out' channelに送信します。 func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for n := range in { squared := n * n // fmt.Printf("Worker %d: processing %d -> %d\n", id, n, squared) time.Sleep(time.Millisecond * 50) // 処理をシミュレート out <- squared } fmt.Printf("Worker %d finished.\n", id) } func main() { const ( numJobs = 20 numWorkers = 3 ) // Fan-out: 複数のワーカーにジョブを送信 jobs := make(chan int, numJobs) results := make(chan int, numJobs) // fan-inのための結果channelをバッファリング var workerWG sync.WaitGroup // ワーカーを開始 (Fan-out) for w := 1; w <= numWorkers; w++ { workerWG.Add(1) go worker(w, jobs, results, &workerWG) } // jobs channelにジョブを送信 for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 送信するジョブはありません // すべてのワーカーが現在のジョブを完了するのを待つ // これにより、すべて の結果が'results' channel に送信されることも保証されます workerWG.Wait() close(results) // すべてのワーカーが完了してから結果channelを閉じる // fan-inコレクターにこれ以上結果が生成されないことを通知するため。 // Fan-in: 結果を収集 fmt.Println("\nCollecting results:") for r := range results { fmt.Printf("Collected result: %d\n", r) } fmt.Println("All done!") }
説明:
jobs
channel:最初のタスク(二乗する数字)が送信される場所です。results
channel:すべてのワーカーからの二乗された数値が収集される場所です。- Fan-out:
jobs
channelから読み取るすべてのnumWorkers
goroutine(worker
関数)を起動します。 - ジョブ分散: メインgoroutineは数字を
jobs
channelに送信します。Goのランタイムは自動的にこれらの数字を利用可能なworker
goroutineに分散します。 - Fan-in: メインgoroutineはその後
results
channelから読み取ります。すべてのワーカーが完了し、最後の結果を送信する機会を得た後にのみresults
が閉じられるため、main
のfor r := range results
ループはすべての生成された結果を正しく受信し、その後終了します。workerWG
はすべてのワーカーを待つことを保証します。
パターン3:パイプライン
A pipeline is a series of stages, where the output of one stage becomes the input for the next. Each stage typically operates concurrently. In Go, pipelines are elegantly constructed using channels to connect the stages. (パイプラインは一連のステージであり、あるステージの出力は次のステージの入力になります。各ステージは通常、並行して動作します。Goでは、パイプラインはchannelを使用してステージを接続することによりエレガントに構築されます。)
なぜ使用するのか?
- モジュール性: 複雑な操作を、小さく、管理可能で、再利用可能なコンポーネントに分解します。
- 並行処理: 各ステージは並行して実行でき、前のステージから利用可能になったデータを処理します。
- スループット: データはパイプラインを流れます。これにより、シーケンシャル処理よりも高いスループットが得られることがよくあります。
例:テキスト処理パイプライン
以下のパイプラインを構築しましょう。
- 数値のシーケンスを生成します(プロデューサー)。
- 偶数をフィルターします。
- 残りの奇数を二乗します。
- 最終結果を出力します。
package main import ( "fmt" "sync" "time" ) // Generator ステージ: 数値を生成 func generate(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } // Filter ステージ: 偶数をフィルター func filterOdd(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 != 0 { // 奇数のみを保持 select { case out <- n: case <-done: return } } } }() return out } // Square ステージ: 数値を二乗 func square(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { squared := n * n select { case out <- squared: case <-done: return } } }() return out } func main() { // すべてのgoroutineの正常なシャットダウンのためのdone channel done := make(chan struct{}) defer close(done) // mainが終了したときにdoneが閉じられることを保証 // ステージ1: 数値を生成 numbers := generate(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // ステージ2: 偶数をフィルター oddNumbers := filterOdd(done, numbers) // ステージ3: 奇数を二乗 squaredOddNumbers := square(done, oddNumbers) // 最終ステージ: 結果を消費して出力 fmt.Println("Pipeline results:") for result := range squaredOddNumbers { fmt.Printf("Result: %d\n", result) time.Sleep(time.Millisecond * 10) // 最終処理をシミュレート } fmt.Println("Pipeline finished.") }
このパイプラインの主要な側面:
- 連鎖するchannel:
generate
はchannelに送信し、そのchannelはfilterOdd
への入力として渡され、その出力channelはsquare
に渡されます。 <-chan int
とchan<- int
: これらの方向性のあるchannel型を使用することで、安全性と可読性が向上し、関数がchannelで送信または受信しているかが明確に示されます。- 正常なシャットダウン(
done
channel):done
channelは、パイプライン内のすべてのgoroutineに処理を停止して終了するようにシグナルを送るための一般的なパターンです。main
が終了するとき、defer close(done)
は、done
をリッスンしているすべてのgoroutineが正常に返り、goroutineリークを防ぐことを保証します。これは、長時間実行されるパイプラインや、パイプラインの早い段階でエラーが発生した場合に特に重要です。 - 各ステージは独立したgoroutineであり、並行して動作します。
generate
が数値を生成するとすぐに、filterOdd
がそれを処理でき、その後square
がそれを二乗でき、入力全体が生成されるのを待つ必要はありません。
パターンの結合とベストプラクティス
これらのパターンは相互に排他的ではなく、洗練された並行システムを構築するために結合できます。たとえば、パイプラインのステージ自体が、サブタスクを並列化するためのfan-out/fan-in操作であることもあります。
Goの並行処理に関する一般的なベストプラクティス:
- メモリを共有するのではなく、通信によって](https://go.dev/blog/go-conference-keynote)communicating: これがGoのモットーです。通信と同期にはchannelを使用します。
- goroutineは安価なので、惜しみなく使用する: 多くのgoroutineを起動することを恐れないでください。
- 完了を通知するためにchannelを閉じる: 送信するデータがなくなったら、常にchannelを閉じます。これにより、受信側での
for ... range
ループのブロックが解除されます。 - goroutineの待機には
sync.WaitGroup
を使用する: メインプログラムが終了する前にすべてのgoroutineが完了することを保証するために不可欠です。 - エラーと正常なシャットダウンを処理する:
done
channelやcontextのようなメカニズムを実装して、操作をキャンセルし、すべてのgoroutineがクリーンアップすることを保証します。 - グローバル状態は可能な限り避ける: 共有状態が避けられない場合は、
sync.Mutex
またはsync.RWMutex
で保護するか、さらに良いのは単一のgoroutine(例:「モニター」goroutine)を介してアクセスをシリアル化します。 - キャンセルとデッドラインには
context
パッケージを考慮する: タイムアウト、デッドライン、またはカスケードされたキャンセルを伴うより複雑なシナリオには、context
パッケージが不可欠です。 - channelを適切にバッファリングする: バーストを平滑化するため、またはプロデューサーがブロックせずに先行できるようにするためにバッファ付きchannelを使用しますが、メモリ使用量には注意してください。バッファなしchannelは厳密な同期(ランデブー)を強制します。
- 並行処理を徹底的にテストする: 並行処理のバグは微妙な場合があります。Goの競合検出器(
go run -race filename.go
またはgo test -race ./...
)で-race
フラグを使用します。
結論
goroutineとchannelに基づいたGoの並行処理モデルは、直感的で強力な並行アプリケーション設計方法を提供します。Producer-Consumer、Fan-out/Fan-in、およびPipelinesのようなパターンを理解して適用することにより、開発者は堅牢でスケーラブルかつ効率的なシステムを構築し、現代のマルチコアプロセッサを効果的に活用できます。これらのパターンはモジュール性と保守性を促進し、Goでの並行プログラミングをはるかに楽しく、エラーの少ない体験にします。それらを取り入れれば、Goアプリケーションは自然に、より並行でパフォーマンスが高くなるでしょう。