Goにおけるsync/atomicを使ったアトミック操作の理解
Ethan Miller
Product Engineer · Leapcell

Goにおけるsync/atomicを使ったアトミック操作の理解
並行プログラミングにおいて、データ競合を避け、予測可能な動作を保証するためには、共有状態を正しく管理することが最も重要です。Goは、ミューテックス、チャネル、ウェイトグループなど、並行処理制御のためのいくつかのメカニズムを提供しています。ミューテックス(sync.Mutex
など)は、クリティカルセクションを保護するための堅牢な方法を提供しますが、特にカウンタのインクリメントのような単純な操作では、ロックとロック解除によるオーバーヘッドが生じることがあります。ここでアトミック操作が登場します。
Goのsync/atomic
パッケージは、明示的なロックなしで、スレッドセーフな方法で値を加算、比較・スワップ、またはロードするなどの一般的なタスクを実行する低レベルのプリミティブ操作を提供します。これらの操作は通常、マルチコア環境であっても、単一の不可分なステップで完了することを保証する特別なCPU命令を使用して実装されます。これにより、特定のユースケースにおいて非常に効率的になります。
なぜアトミック操作なのか?
複数のゴルーチンが共有カウンタをインクリメントする必要があるシナリオを考えてみましょう。単純なアプローチは以下のようになるかもしれません。
package main import ( "fmt" "runtime" "sync" "time" ) func main() { counter := 0 numGoroutines := 1000 var wg sync.WaitGroup wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() for j := 0; j < 1000; j++ { counter++ // データ競合! } }() } wg.Wait() fmt.Println("Final Counter (potential race):", counter) }
このコードを複数回実行すると、異なる結果が得られる可能性があり、最終的なcounter
の値は1,000,000
未満になる可能性が高いです。これは、counter++
がアトミックではないためです。読み込み、インクリメント、書き込みの3つのステップが含まれます。これらのステップの間にコンテキストスイッチが発生し、更新が失われる可能性があります。
これを修正する1つの方法は、sync.Mutex
を使用することです。
package main import ( "fmt" "sync" ) func main() { counter := 0 numGoroutines := 1000 var wg sync.WaitGroup var mu sync.Mutex // カウンタを保護するためのミューテックス wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() for j := 0; j < 1000; j++ { mu.Lock() counter++ mu.Unlock() } }() } wg.Wait() fmt.Println("Final Counter (with mutex):", counter) // 1,000,000になるはずです }
正しいですが、すべての小さなインクリメントに対してミューテックスの取得と解放を行うと、不要なオーバーヘッドが発生する可能性があります。単純な算術演算や値の交換には、sync/atomic
はよりパフォーマンスの高い代替手段を提供します。
主要なアトミック操作
sync/atomic
パッケージは、さまざまな整数型(int32
、int64
、uint32
、uint64
)、ポインタ(unsafe.Pointer
)、およびブール値(整数によって暗黙的に処理)のアトミック操作を提供します。以下に、最も一般的に使用される関数をいくつか示します。
1. Add*
関数
これらの関数は、値にデルタをアトミックに加算し、新しい値を返します。
atomic.AddInt32(addr *int32, delta int32) (new int32)
atomic.AddInt64(addr *int64, delta int64) (new int64)
atomic.AddUint32(addr *uint32, delta uint32) (new uint32)
atomic.AddUint64(addr *uint64, delta uint64) (new uint64)
atomic.AddInt64
を使用してカウンタの例をリファクタリングしましょう。
package main import ( "fmt" "sync" "sync/atomic" // atomicパッケージをインポート ) func main() { var counter int64 // アトミック操作のためにint64を使用 numGoroutines := 1000 var wg sync.WaitGroup wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() for j := 0; j < 1000; j++ { atomic.AddInt64(&counter, 1) // 1をアトミックに加算 } }() } wg.Wait() fmt.Println("Final Counter (with atomic):", counter) // 1,000,000になるはずです }
このバージョンは正しく、また、ミューテックス管理のオーバーヘッドがなく、ベアメタルのCPU命令に依存しているため、単純なインクリメントではミネテックスベースのアプローチよりも一般的に効率的です。
2. Load*
関数
これらの関数は、アドレスに格納されている値をアトミックにロード(読み込み)します。
atomic.LoadInt32(addr *int32) (val int32)
atomic.LoadInt64(addr *int64) (val int64)
atomic.LoadUint32(addr *uint32) (val uint32)
atomic.LoadUint64(addr *uint64) (val uint64)
atomic.LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
別のゴルーチンによってアトミックに書き込まれる可能性のある値を読み取る場合は、常にアトミックロードを使用することが重要です。これにより、最新の一貫性のある値を取得できます。
例:アトミックカウンタの読み込み:
package main import ( "fmt" "sync" "sync/atomic" "time" ) func main() { var counter int64 stop := make(chan struct{}) go func() { for { select { case <-stop: return default: atomic.AddInt64(&counter, 1) // カウンタをインクリメント time.Sleep(time.Millisecond) // いくらかの作業をシミュレート } } }() time.Sleep(5 * time.Second) // 5秒間実行させる // カウンタの現在の値をアトミックにロード currentValue := atomic.LoadInt64(&counter) fmt.Println("Current counter value:", currentValue) close(stop) time.Sleep(100 * time.Millisecond) // ゴルーチンが停止する時間を与える fmt.Println("Final counter value:", atomic.LoadInt64(&counter)) }
3. Store*
関数
これらの関数は、アドレスに新しい値をアトミックに格納(書き込み)します。
atomic.StoreInt32(addr *int32, val int32)
atomic.StoreInt64(addr *int64, val int64)
atomic.StoreUint32(addr *uint32, val uint32)
atomic.StoreUint64(addr *uint64, val uint64)
atomic.StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
例:新しい状態値の格納:
package main import ( "fmt" "sync" "sync/atomic" time "time" ) const ( StateRunning = 0 StatePaused = 1 StateStopped = 2 ) func main() { var currentState int32 = StateRunning // 初期状態 var wg sync.WaitGroup wg.Add(2) // 状態を変更するゴルーチン go func() { defer wg.Done() fmt.Println("Service: Changing state to Paused...") a<ctrl61>tomic.StoreInt32(¤tState, StatePaused) // 状態をアトミックに設定 time.Sleep(time.Second) fmt.Println("Service: Changing state to Stopped...") a<ctrl63>eval/go_atomic_operations.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.json.jsontomic.StoreInt32(¤tState, StateStopped) }() // 状態を監視するゴルーチン go func() { defer wg.Done() for i := 0; i < 5; i++ { // 現在の状態をアトミックにロード val := atomic.LoadInt32(¤tState) fmt.Printf("Monitor: Current state is %d\n", val) time.Sleep(500 * time.Millisecond) if val == StateStopped { break } } }() wg.Wait() fmt.Println("All done.") }
4. Swap*
関数
これらの関数は、アドレスの値を新しい値とアトミックに交換し、古い値を返します。
atomic.SwapInt32(addr *int32, new int32) (old int32)
atomic.SwapInt64(addr *int64, new int64) (old int64)
atomic.SwapUint32(addr *uint32, new uint32) (old uint32)
atomic.SwapUint64(addr *uint64, new uint64) (old uint64)
atomic.SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
Swap
は、値を置き換えて以前の値も同時に取得する必要があるシナリオ(例:ロックフリーキューの実装やフラグのクリア)に役立ちます。
例:フラグをリセットして以前の状態を確認する:
package main import ( "fmt" "sync/atomic" time "time" ) func main() { var isProcessing int32 = 0 // 0はfalse、1はtrue //"ロック"を取得しようとしたり、処理中であることをシグナルするワーカーをシミュレート go func() { for i := 0; i < 3; i++ { // isProcessingを1(true)に設定し、古い値を取得しようとする // 古い値が0の場合、ロックの取得に成功したことを意味します oldVal := atomic.SwapInt32(&isProcessing, 1) if oldVal == 0 { fmt.Printf("Worker %d: Acquired processing lock. Doing work...\n", i+1) time.Sleep(time.Second) // 作業をシミュレート a"tomic.StoreInt32(&isProcessing, 0) // ロックを解放 fmt.Printf("Worker %d: Released processing lock.\n", i+1) } else { fmt.Printf("Worker %d: Could not acquire lock, already busy.\n", i+1) time.Sleep(200 * time.Millisecond) // 再接続を試みる前に少し待つ } } }() time.Sleep(3 * time.Second) // メインゴルーチンをアクティブに保つ fmt.Println("Final processing state:", atomic.LoadInt32(&isProcessing)) }
5. CompareAndSwap*
(CAS)関数
これらは、おそらく最も強力なアトミック操作です。アドレスの値が期待値と一致する場合にのみ、条件付きで値を変更します。
atomic.CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
atomic.CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
atomic.CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
atomic.CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
atomic.CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
CAS操作は、多くのロックフリーアルゴリズム、非ブロックキュー、スタック、スレッドセーフデータ構造の基本的なビルディングブロックです。一般的なパターンは「読み取り-変更-書き込み」ループです。
for { oldVal := atomic.Load*(addr) // 1. 現在の値を読み取る newVal := calculateNewValue(oldVal) // 2. 古い値に基づいて新しい値を計算する if atomic.CompareAndSwap*(addr, oldVal, newVal) { // 3. 交換を試みる break // 成功! } // else, 別のゴルーチンが値を変更しました。ループして古い値で再試行します。 }
例:スレッドセーフな最大値更新プログラムの実装:
package main import ( "fmt" "sync" "sync/atomic" ) func main() { var maxVal int64 = 0 // 初期最大値 numGoroutines := 10 var wg sync.WaitGroup wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { newValue := int64(id*100 + j) // 増加する値を生成する for { oldVal := atomic.LoadInt64(&maxVal) // 現在の最大値を読み取る if newValue > oldVal { // 現在の最大値がoldValのままである場合にのみ新しい値を設定しようとします if atomic.CompareAndSwapInt64(&maxVal, oldVal, newValue) { // 正常に更新されました // fmt.Printf("Goroutine %d: Updated max from %d to %d\n", id, oldVal, newValue) break } // CASが失敗した場合、別のゴルーチンが更新しました。ループして新しいoldValで再試行します。 } else { // 私たちの新しい値はそれ以上ではありません。更新する必要はありません break } } } }(i) } wg.Wait() fmt.Println("Final Max Value:", atomic.LoadInt64(&maxVal)) // 999になるはずです }
これにより、maxVal
は、LoadInt64
呼び出し時に観測された値よりも新しい値が実際に大きい場合にのみ更新が保証され、CompareAndSwapInt64
操作は、このチェックと更新がアトミックに実行されることを保証します。
6. atomic.Pointer[T]
(Go 1.19+)
Go 1.19では、atomic.Pointer[T]
が導入されました。これは、任意の型T
の値(内部的にはunsafe.Pointer
経由)に対するアトミック操作を提供するジェネリック型で、明示的なunsafe.Pointer
キャストの必要性をなくします。これにより、アトミックにポインタを管理する際の型安全性と使いやすさが大幅に向上します。
そのメソッドは、グローバルなatomic
関数(Load()
、Store()
、Swap()
、CompareAndSwap()
)を反映しています。
package main import ( "fmt" "sync" "sync/atomic" time "time" ) type Config struct { LogLevel string MaxConns int Timeout time.Duration } func main() { // 初期設定でatomic.Pointerを初期化します var currentConfig atomic.Pointer[Config] currentConfig.Store(&Config{ LogLevel: "INFO", MaxConns: 10, Timeout: 5 * time.Second, }) var wg sync.WaitGroup wg.Add(2) // 設定を更新するゴルーチン go func() { defer wg.Done() time.Sleep(2 * time.Second) fmt.Println("Updater: Updating config...") newConfig := &Config{ LogLevel: "DEBUG", MaxConns: 20, Timeout: 10 * time.Second, } currentConfig.Store(newConfig) // 新しい設定をアトミックに格納 fmt.Println("Updater: Config updated.") time.Sleep(2 * time.Second) newerConfig := &Config{ LogLevel: "ERROR", MaxConns: 5, Timeout: 2 * time.Second, } // ポインタのCompareAndSwapの例 oldConfig := currentConfig.Load() if currentConfig.CompareAndSwap(oldConfig, newerConfig) { fmt.Printf("Updater: Successfully CASed config from %s to %s\n", oldConfig.LogLevel, newerConfig.LogLevel) } else { fmt.Println("Updater: CAS failed, config changed by someone else.") } }() // 設定を読み取るゴルーチン go func() { defer wg.Done() for i := 0; i < 5; i++ { cfg := currentConfig.Load() // 設定をアトミックにロード fmt.Printf("Reader: Current config - LogLevel: %s, MaxConns: %d\n", cfg.LogLevel, cfg.MaxConns) time.Sleep(1 * time.Second) } }() wg.Wait() fmt.Println("Final Config:", currentConfig.Load()) }
atomic.Pointer[T]
は、設定のホットリロード、コピーオンライトデータ構造の実装、またはミューテックスなしで複雑なオブジェクトをゴルーチン間で安全に交換するシナリオに便利です。
sync/atomic
とsync.Mutex
の使い分け
アトミック操作とミューテックスの選択は、共有状態の複雑さと実行される操作に依存します。
-
sync/atomic
を使用する場合:- プリミティブな整数型またはポインタに対する単純な読み取り、書き込み、加算、交換、または比較・交換操作が必要です。
- パフォーマンスが重要であり、操作がハードウェアレベルで真にアトミックである場合。
- ミューテックスのロック/アンロックのオーバーヘッドを回避したい場合。
- ロックフリーデータ構造を実装している場合(ただし、これは高度でエラーが発生しやすいです)。
-
sync.Mutex
(またはsync.RWMutex
)を使用する場合:- 共有状態が複雑なデータ構造(マップ、スライス、構造体など)であり、複数のフィールドが連動して変更される場合、または単一の論理ユニットとしてグループ化する必要がある複数の離散的な読み取り/書き込み操作が含まれる場合。
- 単純な算術演算や代入よりも複雑な操作(スライスへの追加、マップからの削除など)。
- 単一の値だけでなく、クリティカルセクション全体を保護する必要がある場合。
- 単純さと正しさがマイクロ最適化よりも優先される場合。ミューテックスは、複雑なシナリオでは一般的に推論しやすく、エラーが発生しにくいです。
また、sync/atomic
操作は単一の操作に対してのみアトミック性を保証することを覚えておくことが重要です。複数のアトミック操作が同時に発生する必要がある場合は、それらの集合的なアトミック性を確保するために、ミューテックスまたはより高度な並行処理プリミティブが必要になる場合があります。
アラインメント要件
64ビット値(int64
、uint64
)を扱うsync/atomic
関数では、変数のメモリアドレスが64ビットアラインメントされている必要があります。Goのガベージコレクタは、通常、8バイト以上のサイズの変数が適切にアラインメントされていることを保証します。ただし、構造体に64ビット値を埋め込む場合は、構造体のレイアウトに注意し、アトミック変数が先頭にあるか、アラインメントを維持するために明示的にパディングされていることを確認する必要があります。そうしないと、panic: atomic: store of unaligned 64-bit value
が発生する可能性があります。
Goメモリモデル(sync/atomic
に関連)は、Load
やStore
のような操作についても保証を提供します。あるゴルーチンがatomic.Store*
を使用して変数x
に値v
を書き込み、別のゴルーチンが subsequently atomic.Load*
を使用してx
を読み取ると、その読み取り操作はv
またはv
の後に書き込まれた値のいずれかを観測します。これにより、正しさにとって重要な可視性と順序付けのプロパティが保証されます。
結論
sync/atomic
パッケージは、Goの並行処理ツールボックスにおける強力なツールであり、共有状態を管理するための非常に効率的な低レベルプリミティブを提供します。CPUレベルのアトミック命令を活用することにより、単純で競合の激しい操作に対して、ミューテックスと比較して細かい制御を可能にし、パフォーマンスを大幅に向上させることができます。しかし、その有効性は基本的な型と操作に限定されます。より複雑な並行アクセスパターンやデータ構造については、sync.Mutex
とチャネルは、Goの主要な汎用並行処理メカニズムとして残ります。sync/atomic
をいつ、どのように適切に使用するかを理解することは、堅牢で高性能な並行Goアプリケーションを記述するための鍵となります。