Go Concurrency Patterns - A Deep Dive into Producer-Consumer, Fan-out/Fan-in, and Pipelines
Wenhao Wang
Dev Intern · Leapcell

Go's built-in concurrency features, primarily goroutines and channels, provide a powerful yet elegant way to write highly concurrent and parallel applications. Unlike traditional thread-based models, Go's approach simplifies concurrent programming, making it less prone to common pitfalls like deadlocks and race conditions. This article delves into several core concurrency patterns in Go: Producer-Consumer, Fan-out/Fan-in, and Pipelines, illustrating their implementation and benefits with practical examples.
The Foundations: Goroutines and Channels
Before diving into patterns, let's briefly recap the building blocks:
- Goroutines: Lightweight, independently executing functions. They are multiplexed onto a smaller number of OS threads, making them very efficient. You launch a goroutine by prepending
go
to a function call:go myFunction()
. - Channels: Typed conduits through which goroutines can communicate and synchronize. They are the "Go way" to share memory by communicating, rather than communicating by sharing memory. Think of them as pipes connecting concurrent components. You create a channel using
make(chan Type)
, send withch <- value
, and receive withvalue := <-ch
. Channels can be buffered (make(chan Type, capacity)
) or unbuffered (make(chan Type)
).
Pattern 1: Producer-Consumer
The Producer-Consumer pattern is a classic concurrency design where one or more "producers" generate data and place it into a shared buffer, while one or more "consumers" retrieve data from the buffer and process it. In Go, channels naturally serve as this shared buffer.
Why use it?
- Decoupling: Producers don't need to know how data is consumed, and consumers don't need to know how data is produced.
- Load Smoothing: If producers generate data at an irregular rate, a buffer can smooth out the flow to consumers.
- Concurrency: Producers and consumers can operate concurrently, potentially speeding up overall processing.
Example: File Processing with a Bounded Buffer
Let's imagine a scenario where we need to read lines from a large file (producer) and then process each line (consumer).
package main import ( "bufio" "fmt" "os" "strconv" "strings" "sync" "time" ) // LineProducer reads lines from a file and sends them to a channel. func LineProducer(filePath string, lines chan<- string, wg *sync.WaitGroup) { defer wg.Done() file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) close(lines) // Ensure channel is closed on error return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { lines <- scanner.Text() // Send line to channel } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) } close(lines) // Important: Close the channel to signal no more data } // LineConsumer processes lines received from a channel. func LineConsumer(id int, lines <-chan string, processedCount *int64, wg *sync.WaitGroup) { defer wg.Done() for line := range lines { // Simulate some CPU-intensive processing time.Sleep(time.Millisecond * 10) num, err := strconv.Atoi(strings.TrimSpace(line)) if err == nil { // fmt.Printf("Consumer %d processed: %d (squared: %d)\n", id, num, num*num) } else { // fmt.Printf("Consumer %d skipped non-integer line: %s\n", id, line) } // Safely increment processed count using a mutex or atomic operation // For simplicity, we'll use atomic.AddInt64 in main } fmt.Printf("Consumer %d finished.\n", id) } func main() { const ( numConsumers = 5 bufferSize = 100 // Buffered channel to smooth out producer/consumer rates filePath = "data.txt" ) // Create a dummy data.txt for demonstration createDummyFile(filePath, 1000) linesChannel := make(chan string, bufferSize) // Buffered channel var wg sync.WaitGroup var processed int64 // Use atomic for shared counter in real apps // Start the producer wg.Add(1) go LineProducer(filePath, linesChannel, &wg) // Start consumers for i := 0; i < numConsumers; i++ { wg.Add(1) go LineConsumer(i+1, linesChannel, &processed, &wg) } // Wait for all goroutines to finish wg.Wait() fmt.Printf("All producers and consumers finished.\n") } // Helper function to create a dummy file func createDummyFile(filePath string, numLines int) { file, err := os.Create(filePath) if err != nil { panic(err) } defer file.Close() writer := bufio.NewWriter(file) for i := 0; i < numLines; i++ { fmt.Fprintf(writer, "%d\n", i) } writer.Flush() fmt.Printf("Created dummy file: %s with %d lines.\n", filePath, numLines) }
In this example:
LineProducer
is the producer, reading lines and sending them tolinesChannel
.LineConsumer
instances are consumers, receiving lines fromlinesChannel
and processing them.linesChannel
acts as the bounded buffer. ThebufferSize
prevents the producer from running too far ahead of the consumers, potentially exhausting memory.sync.WaitGroup
is crucial for main to wait until all producers and consumers have completed their work before exiting.- Closing the
linesChannel
inLineProducer
is vital. It signals to consumers that no more data will be sent, allowing theirfor line := range lines
loops to terminate gracefully.
Pattern 2: Fan-out / Fan-in
The Fan-out / Fan-in pattern is about distributing a set of tasks among multiple worker goroutines (fan-out) and then collecting their results back into a single channel (fan-in). This pattern is excellent for parallelizing computations.
Why use it?
- Parallelism: Leverage multiple CPU cores or distribute work across a network.
- Scalability: Easily add more workers to handle increased load.
- Work Distribution: Break down a large problem into smaller, independent sub-problems.
Example: Parallel Squaring of Numbers
Let's say we have a list of numbers and we want to square them in parallel.
package main import ( "fmt" "sync" "time" ) // worker takes numbers from 'in' channel, squares them, and sends to 'out' channel. func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for n := range in { squared := n * n // fmt.Printf("Worker %d: processing %d -> %d\n", id, n, squared) time.Sleep(time.Millisecond * 50) // Simulate work out <- squared } fmt.Printf("Worker %d finished.\n", id) } func main() { const ( numJobs = 20 numWorkers = 3 ) // Fan-out: Send jobs to multiple workers jobs := make(chan int, numJobs) results := make(chan int, numJobs) // Buffer results channel for fan-in var workerWG sync.WaitGroup // Start workers (Fan-out) for w := 1; w <= numWorkers; w++ { workerWG.Add(1) go worker(w, jobs, results, &workerWG) } // Send jobs to the jobs channel for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // No more jobs to send // Wait for all workers to finish their current jobs // This also ensures all results are sent to the 'results' channel workerWG.Wait() close(results) // Important: Close results channel AFTER all workers are done // to signal that no more results will be produced for the fan-in collector. // Fan-in: Collect results fmt.Println("\nCollecting results:") for r := range results { fmt.Printf("Collected result: %d\n", r) } fmt.Println("All done!") }
Explanation:
jobs
channel: This is where the initial tasks (numbers to be squared) are sent.results
channel: This is where the squared numbers from all workers are collected.- Fan-out: We launch
numWorkers
goroutines (worker
function), all reading from thejobs
channel. - Job Distribution: The main goroutine sends numbers into the
jobs
channel. Go's runtime automatically distributes these numbers to availableworker
goroutines. - Fan-in: The main goroutine then reads from the
results
channel. Becauseresults
is closed only after all workers have finished and have had a chance to send their last results, thefor r := range results
loop inmain
will correctly receive all produced results and then terminate.workerWG
ensures we wait for all workers.
Pattern 3: Pipelines
A pipeline is a series of stages, where the output of one stage becomes the input for the next. Each stage typically operates concurrently. In Go, pipelines are elegantly constructed using channels to connect the stages.
Why use it?
- Modularity: Break down complex operations into smaller, manageable, and reusable components.
- Concurrency: Each stage can run concurrently, processing data as it becomes available from the previous stage.
- Throughput: Data flows through the pipeline, often leading to higher throughput than sequential processing.
Example: Text Processing Pipeline
Let's build a pipeline that:
- Generates a sequence of numbers (producer).
- Filters out even numbers.
- Squares the remaining odd numbers.
- Prints the final results.
package main import ( "fmt" "sync" "time" ) // Generator stage: Produces numbers func generate(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } // Filter stage: Filters out even numbers func filterOdd(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 != 0 { // Keep only odd numbers select { case out <- n: case <-done: return } } } }() return out } // Square stage: Squares numbers func square(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { squared := n * n select { case out <- squared: case <-done: return } } }() return out } func main() { // A done channel for graceful shutdown of all goroutines done := make(chan struct{}) defer close(done) // Ensure done is closed when main exits // Stage 1: Generate numbers numbers := generate(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // Stage 2: Filter out even numbers oddNumbers := filterOdd(done, numbers) // Stage 3: Square the odd numbers squaredOddNumbers := square(done, oddNumbers) // Final stage: Consume and print results fmt.Println("Pipeline results:") for result := range squaredOddNumbers { fmt.Printf("Result: %d\n", result) time.Sleep(time.Millisecond * 10) // Simulate final processing } fmt.Println("Pipeline finished.") }
Key aspects of this pipeline:
- Chained Channels:
generate
sends to a channel, which is then passed as input tofilterOdd
, whose output channel is passed tosquare
. <-chan int
andchan<- int
: Using these directional channel types improves safety and readability, clearly indicating whether a function sends or receives on a channel.- Graceful Shutdown (
done
channel): Thedone
channel is a common pattern for signaling to all goroutines in a pipeline to stop processing and exit. Whenmain
exits,defer close(done)
ensures that all goroutines listening ondone
gracefully return, preventing goroutine leaks. This is especially important in long-running pipelines or if an error occurs early in the pipeline. - Each stage is an independent goroutine, working concurrently. As soon as
generate
produces a number,filterOdd
can process it, and thensquare
can square it, without waiting for the entire input to be generated.
Combining Patterns and Best Practices
These patterns are not mutually exclusive; they can be combined to build sophisticated concurrent systems. For instance, a stage in a pipeline could itself be a fan-out/fan-in operation to parallelize a sub-task.
General Best Practices for Go Concurrency:
- Communicate by Sharing Memory, Not Share Memory by Communicating: This is Go's mantra. Use channels for communication and synchronization.
- Goroutines are Cheap, Use Them Liberally: Don't be afraid to launch many goroutines.
- Close Channels to Signal Completion: Always close channels when no more data will be sent. This unblocks
for ... range
loops on the receiving end. - Use
sync.WaitGroup
for Waiting on Goroutines: Essential for ensuring all goroutines complete before the main program exits. - Handle Errors and Graceful Shutdowns: Implement mechanisms like the
done
channel or context for canceling operations and ensuring all goroutines clean up. - Avoid Global State where Possible: If shared state is unavoidable, protect it with
sync.Mutex
orsync.RWMutex
, or better yet, serialize access through a single goroutine (e.g., a "monitor" goroutine). - Consider
context
package for Cancellations and Deadlines: For more complex scenarios involving timeouts, deadlines, or cascaded cancellations, thecontext
package is indispensable. - Buffer Channels Appropriately: Use buffered channels to smooth out bursts or to allow producers to get ahead without blocking, but be mindful of memory usage. Unbuffered channels enforce strict synchronization (rendezvous).
- Test Concurrency Thoroughly: Concurrency bugs can be subtle. Use the
-race
flag for the Go race detector (go run -race filename.go
orgo test -race ./...
).
Conclusion
Go's concurrency model, built upon goroutines and channels, provides an intuitive and powerful way to design concurrent applications. By understanding and applying patterns like Producer-Consumer, Fan-out/Fan-in, and Pipelines, developers can build robust, scalable, and efficient systems that effectively utilize modern multi-core processors. These patterns encourage modularity and maintainability, making concurrent programming in Go a much more enjoyable and less error-prone experience. Embrace them, and your Go applications will naturally become more concurrent and performant.