Go 동시성 패턴 - 프로듀서-소비자, 팬아웃/팬인, 파이프라인 심층 분석
Wenhao Wang
Dev Intern · Leapcell

Go의 내장 동시성 기능, 주로 고루틴과 채널은 고도로 동시적이고 병렬적인 애플리케이션을 작성할 수 있는 강력하면서도 우아한 방법을 제공합니다. 전통적인 스레드 기반 모델과 달리 Go의 접근 방식은 동시성 프로그래밍을 단순화하여 데드락 및 경쟁 상태와 같은 일반적인 함정에 빠질 가능성을 줄입니다. 이 문서는 Go의 몇 가지 핵심 동시성 패턴인 프로듀서-소비자, 팬아웃/팬인, 파이프라인을 심층적으로 살펴보고 실용적인 예제를 통해 구현과 이점을 설명합니다.
기초: 고루틴 및 채널
패턴을 자세히 살펴보기 전에 빌딩 블록을 간략하게 복습해 보겠습니다.
- 고루틴: 가볍고 독립적으로 실행되는 함수입니다. OS 스레드 수가 적게 다중화되어 매우 효율적입니다. 함수 호출 앞에
go
를 붙여 고루틴을 시작합니다:go myFunction()
. - 채널: 고루틴이 통신하고 동기화할 수 있는 타입화된 통로입니다. 메모리를 공유하여 통신하는 'Go 방식'이며, 메모리를 공유하지 않고 통신하는 방식보다 낫습니다. 동시 컴포넌트를 연결하는 파이프라고 생각하면 됩니다.
make(chan Type)
대신 채널을 만들고,ch <- value
로 보내고,value := <-ch
로 받습니다. 채널은 버퍼링(make(chan Type, capacity)
) 또는 버퍼링되지 않은(make(chan Type)
) 상태일 수 있습니다.
패턴 1: 프로듀서-소비자
프로듀서-소비자 패턴은 하나 이상의 '프로듀서'가 데이터를 생성하여 공유 버퍼에 넣고, 하나 이상의 '소비자'가 버퍼에서 데이터를 가져와 처리하는 클래식 동시성 디자인입니다. Go에서는 채널이 자연스럽게 이 공유 버퍼 역할을 합니다.
사용하는 이유
- 디커플링: 프로듀서는 데이터가 어떻게 소비되는지 알 필요가 없고, 소비자는 데이터가 어떻게 생산되는지 알 필요가 없습니다.
- 로드 스무딩: 프로듀서가 불규칙한 속도로 데이터를 생성하는 경우 버퍼가 소비자에게 흐름을 부드럽게 만들 수 있습니다.
- 동시성: 프로듀서와 소비자는 동시적으로 작동하여 전반적인 처리 속도를 높일 수 있습니다.
예제: 바운드 버퍼를 사용한 파일 처리
대용량 파일에서 줄을 읽고(프로듀서) 각 줄을 처리(소비자)해야 하는 시나리오를 상상해 봅시다.
package main import ( "bufio" "fmt" "os" "strconv" "strings" "sync" "time" ) // LineProducer는 파일에서 줄을 읽어 채널로 보냅니다. func LineProducer(filePath string, lines chan<- string, wg *sync.WaitGroup) { defer wg.Done() file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) close(lines) // 오류 발생 시 채널이 닫히도록 보장 return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { lines <- scanner.Text() // 줄을 채널로 보냅니다. } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) } close(lines) // 중요: 더 이상 데이터가 없음을 알리기 위해 채널을 닫습니다. } // LineConsumer는 채널에서 받은 줄을 처리합니다. func LineConsumer(id int, lines <-chan string, processedCount *int64, wg *sync.WaitGroup) { defer wg.Done() for line := range lines { // CPU 집약적인 처리 시뮬레이션 time.Sleep(time.Millisecond * 10) num, err := strconv.Atoi(strings.TrimSpace(line)) if err == nil { // fmt.Printf("Consumer %d processed: %d (squared: %d)\n", id, num, num*num) } else { // fmt.Printf("Consumer %d skipped non-integer line: %s\n", id, line) } // 처리된 카운트를 안전하게 증가시키려면 뮤텍스 또는 원자적 연산을 사용합니다. // 편의를 위해 main에서 atomic.AddInt64를 사용합니다. } fmt.Printf("Consumer %d finished.\n", id) } func main() { const ( numConsumers = 5 bufferSize = 100 // 프로듀서/소비자 속도를 부드럽게 하기 위한 버퍼링된 채널 filePath = "data.txt" ) // 데모를 위해 더미 data.txt 생성 createDummyFile(filePath, 1000) linesChannel := make(chan string, bufferSize) // 버퍼링된 채널 var wg sync.WaitGroup var processed int64 // 실제 앱에서는 공유 카운터에 원자적 사용 // 프로듀서 시작 wg.Add(1) go LineProducer(filePath, linesChannel, &wg) // 소비자 시작 for i := 0; i < numConsumers; i++ { wg.Add(1) go LineConsumer(i+1, linesChannel, &processed, &wg) } // 모든 고루틴이 완료될 때까지 기다립니다. wg.Wait() fmt.Printf("All producers and consumers finished.\n") } // 더미 파일을 생성하는 헬퍼 함수 func createDummyFile(filePath string, numLines int) { file, err := os.Create(filePath) if err != nil { panic(err) } defer file.Close() writer := bufio.NewWriter(file) for i := 0; i < numLines; i++ { fmt.Fprintf(writer, "%d\n", i) } writer.Flush() fmt.Printf("Created dummy file: %s with %d lines.\n", filePath, numLines) }
이 예제에서:
LineProducer
는 프로듀서로, 줄을 읽어linesChannel
로 보냅니다.LineConsumer
인스턴스는 소비자이며,linesChannel
에서 줄을 받아 처리합니다.linesChannel
은 바운드 버퍼 역할을 합니다.bufferSize
는 프로듀서가 소비자를 너무 앞서나가지 못하게 하여 메모리를 고갈시킬 가능성을 방지합니다.sync.WaitGroup
은 메인 프로그램이 종료되기 전에 모든 프로듀서와 소비자가 작업을 완료하기를 기다리는 데 중요합니다.LineProducer
에서linesChannel
을 닫는 것은 필수적입니다. 이는 소비자에게 더 이상 데이터가 전송되지 않음을 알리고,for line := range lines
루프가 정상적으로 종료될 수 있도록 합니다.
패턴 2: 팬아웃 / 팬인
팬아웃 / 팬인 패턴은 작업 집합을 여러 작업자 고루틴(팬아웃)에 분배한 다음 결과를 단일 채널로 다시 수집(팬인)하는 것입니다. 이 패턴은 계산을 병렬화하는 데 탁월합니다.
사용하는 이유
- 병렬성: 여러 CPU 코어를 활용하거나 네트워크를 통해 작업을 분산합니다.
- 확장성: 증가하는 부하를 처리하기 위해 더 많은 작업자를 쉽게 추가할 수 있습니다.
- 작업 분배: 큰 문제를 더 작고 독립적인 하위 문제로 분해합니다.
예제: 숫자의 병렬 제곱
숫자 목록이 있고 이를 병렬로 제곱하고 싶다고 가정해 봅시다.
package main import ( "fmt" "sync" "time" ) // worker는 'in' 채널에서 숫자를 받아 처리하고 'out' 채널로 보냅니다. func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for n := range in { squared := n * n // fmt.Printf("Worker %d: processing %d -> %d\n", id, n, squared) time.Sleep(time.Millisecond * 50) // 작업 시뮬레이션 out <- squared } fmt.Printf("Worker %d finished.\n", id) } func main() { const ( numJobs = 20 numWorkers = 3 ) // 팬아웃: 여러 작업자에게 작업을 보냅니다. jobs := make(chan int, numJobs) results := make(chan int, numJobs) // 팬인을 위한 결과 채널 버퍼링 var workerWG sync.WaitGroup // 작업자 시작 (팬아웃) for w := 1; w <= numWorkers; w++ { workerWG.Add(1) go worker(w, jobs, results, &workerWG) } // jobs 채널에 작업을 보냅니다. for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 더 이상 보낼 작업 없음 // 모든 작업자가 현재 작업을 완료할 때까지 기다립니다. // 이것은 모든 결과가 'results' 채널로 전송되도록 합니다. workerWG.Wait() close(results) // 중요: 모든 작업자가 완료된 후 결과 채널을 닫습니다. // 팬인 수집기에게 더 이상 결과가 생성되지 않음을 알립니다. // 팬인: 결과 수집 fmt.Println("\nCollecting results:") for r := range results { fmt.Printf("Collected result: %d\n", r) } fmt.Println("All done!") }
설명:
jobs
채널: 초기 작업(제곱할 숫자)이 전송되는 곳입니다.results
채널: 모든 작업자로부터 제곱된 숫자가 수집되는 곳입니다.- 팬아웃:
numWorkers
개의 고루틴(worker
함수)을 실행하며, 모두jobs
채널에서 읽습니다. - 작업 분배: 메인 고루틴은 숫자를
jobs
채널로 보냅니다. Go 런타임은 자동으로 이러한 숫자를 사용 가능한worker
고루틴에 분배합니다. - 팬인: 메인 고루틴은
results
채널에서 읽습니다.results
는 모든 작업자가 완료되고 마지막 결과를 보낼 기회를 가진 후에만 닫히기 때문에,main
의for r := range results
루프는 모든 생성된 결과를 올바르게 수신한 다음 종료됩니다.workerWG
는 모든 작업자가 완료될 때까지 기다리도록 합니다.
패턴 3: 파이프라인
A 파이프라인은 한 단계의 출력이 다음 단계의 입력이 되는 일련의 단계입니다. 각 단계는 일반적으로 동시적으로 작동합니다. Go에서는 채널을 사용하여 단계를 연결하여 파이프라인을 우아하게 구성합니다.
사용하는 이유
- 모듈성: 복잡한 작업을 더 작고 관리하기 쉬우며 재사용 가능한 구성 요소로 분해합니다.
- 동시성: 각 단계는 이전 단계에서 사용 가능한 데이터로 작동하여 동시적으로 실행될 수 있습니다.
- 처리량: 데이터가 파이프라인을 통해 흐르므로 종종 순차 처리보다 높은 처리량이 발생합니다.
예제: 텍스트 처리 파이프라인
다음과 같은 파이프라인을 구축해 보겠습니다.
- 숫자 시퀀스 생성(프로듀서).
- 짝수 필터링.
- 남은 홀수 제곱.
- 최종 결과 인쇄.
package main import ( "fmt" "sync" "time" ) // Generator 단계: 숫자 생성 func generate(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } // Filter 단계: 짝수 필터링 func filterOdd(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 != 0 { // 홀수만 유지 select { case out <- n: case <-done: return } } } }() return out } // Square 단계: 숫자 제곱 func square(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { squared := n * n select { case out <- squared: case <-done: return } } }() return out } func main() { // 모든 고루틴의 정상 종료를 위한 done 채널 done := make(chan struct{}) defer close(done) // main이 종료될 때 done 채널이 닫히도록 보장 // 단계 1: 숫자 생성 numbers := generate(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 단계 2: 짝수 필터링 oddNumbers := filterOdd(done, numbers) // 단계 3: 홀수 제곱 squaredOddNumbers := square(done, oddNumbers) // 최종 단계: 결과 소비 및 인쇄 fmt.Println("Pipeline results:") for result := range squaredOddNumbers { fmt.Printf("Result: %d\n", result) time.Sleep(time.Millisecond * 10) // 최종 처리 시뮬레이션 } fmt.Println("Pipeline finished.") }
이 파이프라인의 주요 측면:
- 연결된 채널:
generate
는 채널로 보내고, 이 채널은filterOdd
의 입력으로 전달됩니다.filterOdd
의 출력 채널은square
의 입력으로 전달됩니다. <-chan int
및chan<- int
: 이러한 방향 채널 유형을 사용하면 안전성과 가독성이 향상되며, 함수가 채널에서 보내는지 또는 받는지 명확하게 나타냅니다.- 정상 종료 (
done
채널):done
채널은 파이프라인의 모든 고루틴에게 처리를 중지하고 종료하도록 신호를 보내는 일반적인 패턴입니다.main
이 종료될 때defer close(done)
은done
채널을 수신하는 모든 고루틴이 정상적으로 반환되도록 하여 고루틴 누수를 방지합니다. 이는 장기 실행 파이프라인이나 파이프라인의 초기 단계에서 오류가 발생하는 경우 특히 중요합니다. - 각 단계는 독립적인 고루틴이며 동시적으로 작동합니다.
generate
가 숫자를 생성하는 즉시filterOdd
는 이를 처리할 수 있고, 그런 다음square
는 전체 입력이 생성될 때까지 기다리지 않고 숫자를 제곱할 수 있습니다.
패턴 결합 및 모범 사례
이러한 패턴은 상호 배타적이지 않으며, 정교한 동시 시스템을 구축하기 위해 결합될 수 있습니다. 예를 들어, 파이프라인의 한 단계는 하위 작업을 병렬화하기 위한 팬아웃/팬인 작업 자체가 될 수 있습니다.
Go 동시성을 위한 일반적인 모범 사례:
- 메모리를 공유하여 통신하고, 통신하여 메모리를 공유하지 마십시오: 이것은 Go의 좌우명입니다. 통신 및 동기화를 위해 채널을 사용하십시오.
- 고루틴은 저렴하므로 아낌없이 사용하십시오: 많은 고루틴을 실행하는 것을 두려워하지 마십시오.
- 완료 신호를 위해 채널을 닫으십시오: 더 이상 데이터가 전송되지 않을 때는 항상 채널을 닫으십시오. 이렇게 하면 수신 측의
for ... range
루프가 차단되지 않습니다. sync.WaitGroup
을 사용하여 고루틴 대기: 모든 고루틴이 메인 프로그램 종료 전에 완료되도록 보장하는 데 필수적입니다.- 오류 처리 및 정상 종료: 취소 작업을 위한
done
채널 또는 컨텍스트와 같은 메커니즘을 구현하고 모든 고루틴이 정리되도록 합니다. - 전역 상태는 가능한 한 피하십시오: 공유 상태를 피할 수 없는 경우
sync.Mutex
또는sync.RWMutex
로 보호하거나, 단일 고루틴(예: '모니터' 고루틴)을 통해 액세스를 직렬화하는 것이 좋습니다. - 취소 및 마감 시간에
context
패키지 고려: 시간 초과, 마감 시간 또는 계단식 취소가 관련된 더 복잡한 시나리오의 경우context
패키지가 필수적입니다. - 채널을 적절하게 버퍼링하십시오: 버퍼링된 채널을 사용하여 버스트를 부드럽게 하거나 프로듀서가 차단 없이 앞서 나갈 수 있도록 하지만, 메모리 사용량에 주의십시오. 버퍼링되지 않은 채널은 엄격한 동기화(랑데부)를 강제합니다.
- 동시성 철저히 테스트하십시오: 동시성 버그는 미묘할 수 있습니다. Go 경쟁 탐지기를 위해
-race
플래그를 사용합니다 (go run -race filename.go
또는go test -race ./...
).
결론
고루틴과 채널을 기반으로 하는 Go의 동시성 모델은 동시성 애플리케이션을 설계하는 직관적이고 강력한 방법을 제공합니다. 프로듀서-소비자, 팬아웃/팬인, 파이프라인과 같은 패턴을 이해하고 적용함으로써 개발자는 최신 멀티코어 프로세서를 효과적으로 활용하는 강력하고 확장 가능하며 효율적인 시스템을 구축할 수 있습니다. 이러한 패턴은 모듈성과 유지 관리성을 장려하여 Go에서의 동시성 프로그래밍을 훨씬 더 즐겁고 오류가 적은 경험으로 만듭니다. 이러한 패턴을 채택하면 Go 애플리케이션은 자연스럽게 더 동시적이고 성능이 향상될 것입니다.