ORMによる同時実行制御の実装 - 悲観的ロックと楽観的ロックの詳細
Daniel Hayes
Full-Stack Engineer · Leapcell

今日の高同時実行アプリケーションでは、複数のユーザーまたはプロセスが同時に同じデータにアクセスして変更しようとする際に、データ整合性を確保することが最も重要です。適切なメカニズムがないと、競合状態はデータの破損、一貫性のない状態、そして最終的にはシステムへの信頼の失墜につながる可能性があります。データベースのロック戦略は、これらの課題に対処するための基本であり、同時アクセスを管理するための体系的な方法を提供します。従来のSQLは強力なロックプリミティブを提供しますが、特にオブジェクトリレーショナルマッピング(ORM)フレームワーク内では、それらを直接操作するのは煩雑になる可能性があります。この記事では、ORMが開発者に重要なロック戦略、特に悲観的ロック(SELECT FOR UPDATE
の使用)と楽観的ロック(バージョニングによる)を実装するための力をどのように与え、データの一貫性を維持しながらアプリケーション開発を合理化するかを掘り下げます。これらのテクニックを理解することは、単なる学術的な演習ではなく、堅牢でスケーラブルなアプリケーションを構築するための実践的な必要性です。
同時実行制御のコアコンセプト
実装の詳細に入る前に、同時データ変更に関連するコアコンセプトと、これから議論する2つの主要なロック戦略について明確な理解を確立しましょう。
同時実行制御(Concurrency Control): これは、共有データへの同時アクセスを管理するために使用される方法を指します。その目的は、複数のトランザクションが同時に実行されることを許可しながら、データベースが一貫した状態を維持し、同時トランザクションの結果が正しいことを保証することです。
トランザクション(Transaction): トランザクションは、データベースの内容にアクセスし、場合によっては変更する作業の単一の論理単位です。トランザクションは、ACID特性(原子性、一貫性、独立性、永続性)によって特徴付けられます。これは、データベース操作の信頼性の高い処理を保証します。
競合状態(Race Condition): 競合状態は、複数の操作が同時に実行され、最終的な結果がこれらの操作の実行順序に依存する場合に発生します。データベースの観点からは、これは多くの場合、2つのトランザクションが同じデータを読み取り、次に更新しようとし、一方の更新がもう一方を上書きするか、一貫性のない状態にするという状況を意味します。
悲観的ロック(Pessimistic Locking): この戦略は、競合が頻繁に発生すると仮定し、変更前にデータを即座にロックすることで同時アクセスを防ぎます。ロックされた後、他のトランザクションはロックが解除されるまでデータへのアクセスがブロックされます。このアプローチはデータ整合性を保証しますが、同時実行性を低下させ、注意深く管理しないとデッドロックを引き起こす可能性があります。
楽観的ロック(Optimistic Locking): この戦略は、競合がまれであると仮定します。事前にデータをロックする代わりに、同時アクセスを許可し、変更を保存する時点でのみ競合をチェックします。競合が検出された場合(つまり、データが最後に読み取られてから別のトランザクションによって変更された場合)、トランザクションは通常ロールバックされ、ユーザーに再試行を促します。このアプローチは同時実行性を最大化しますが、開発者は潜在的な競合を処理する必要があります。
ORMによる悲観的ロック(SELECT FOR UPDATE)
悲観的ロック、特にSELECT FOR UPDATE
の使用は、データ整合性が最重要であり、同時変更の可能性が高い状況、またはデータの現在の状態に基づいた複雑な意思決定が必要な場合に理想的です。
原理
SELECT FOR UPDATE
クエリが実行されると、データベースは選択された行をロックします。これらのロックされた行を読み取ろうとしたり更新しようとしたりする他のトランザクションは、ロックが解除されるまで待機するか、データベースの分離レベルと同時実行設定に応じてエラーを受け取ります。ロックは、それを取得したトランザクションがコミットまたはロールバックされるまで保持されます。
ORM実装例
ほとんどのORMは、SELECT FOR UPDATE
を組み込むための簡単な方法を提供します。在庫が特定のしきい値以上の場合にのみ在庫を減らす必要があるシナリオで、仮想のProduct
エンティティを使用した例を考えてみましょう。
# Django ORMの例 from django.db import transaction from .models import Product def decrement_product_stock_pessimistic(product_id: int, quantity: int): with transaction.atomic(): # プロダクトをFOR UPDATEで選択し、このトランザクションのためにロックします product = Product.objects.select_for_update().get(id=product_id) if product.stock >= quantity: product.stock -= quantity product.save() print(f"Product {product.id} stock updated successfully to {product.stock}.") return True else: print(f"Insufficient stock for product {product.id}.") # 例外が発生した場合、またはコミットなしでブロックが終了した場合、トランザクションは自動的にロールバックされます。 return False # SQLAlchemy ORMの例 from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import sessionmaker, declarative_base import time Base = declarative_base() class Product(Base): __tablename__ = 'products' id = Column(Integer, primary_key=True) name = Column(String) stock = Column(Integer) def __repr__(self): return f"<Product(id={self.id}, name='{self.name}', stock={self.stock})>" engine = create_engine('sqlite:///:memory:') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) def decrement_product_stock_pessimistic_sqlalchemy(product_id: int, quantity: int): session = Session() try: # トランザクションを開始します with session.begin(): # productをクエリし、with_for_update()を使用してロックします # これはSELECT ... FOR UPDATEに相当します product = session.query(Product).filter_by(id=product_id).with_for_update().first() if product: if product.stock >= quantity: product.stock -= quantity session.add(product) # 更新のためにプロダクトをマークします # session.commit() は、成功時に with session.begin() によって暗黙的に呼び出されます print(f"Product {product.id} stock updated successfully to {product.stock}.") return True else: print(f"Insufficient stock for product {product.id}.") # 特定の条件に対する session.rollback() はここで実行できます。 # そうでない場合、例外発生時にトランザクションはロールバックされます。 return False else: print(f"Product {product_id} not found.") return False except Exception as e: session.rollback() print(f"An error occurred: {e}") return False finally: session.close() # プロダクトの初期化 s = Session() p = Product(id=1, name="Widget", stock=100) s.add(p) s.commit() s.close() # 使用例(同時実行をシミュレートするには、これらのコードを別々のスレッド/プロセスで実行してください) # Thread 1: decrement_product_stock_pessimistic_sqlalchemy(1, 50) # Thread 2: decrement_product_stock_pessimistic_sqlalchemy(1, 70)
DjangoとSQLAlchemyの両方で、ORMはselect_for_update()
呼び出し(またはwith_for_update()
)を適切なSELECT ... FOR UPDATE
SQLクエリに変換します。これにより、選択されたProduct
レコードがトランザクションの期間中ロックされることが保証されます。別のプロセスが同時に同じ製品の在庫を減らそうとすると、最初のトランザクションが完了するのを待つか、データベースとORM構成によっては、エラーを受け取ることになります。
アプリケーションシナリオ
- 金融取引: 二重支出を防いだり、残高が超過しないようにしたりします。
- 在庫管理: 過剰販売を防ぐために在庫レベルを減らします。
- 予約システム: 航空便の座席やホテルの部屋のような限られたリソースを予約します。
- 注文処理: 注文を確定する前に、品目が利用可能であることを確認します。
ORMによる楽観的ロック(バージョニング)
楽観的ロックは、競合がまれなシナリオ、または競合が再試行で処理されるため、高い同時実行性と応答性が即時のデータ整合性よりも重要である場合に一般的に好まれます。
原理
楽観的ロックはデータベースレベルのロックを使用しません。代わりに、テーブルに「バージョン」列(通常は整数またはタイムスタンプ)を追加します。レコードが更新されるたびに、バージョン番号が増加します。トランザクションがレコードを更新しようとすると、まずレコードの現在のバージョンを読み取ります。次に、UPDATE
ステートメントで、データベース内のバージョン番号が最初に読み取られたバージョン番号と一致するかどうかを確認するWHERE
句を含めます。一致しない場合、別のトランザクションがレコードを変更したことを意味し、更新は失敗します。アプリケーションは次にこの失敗を検出し、通常は操作を再試行するか、ユーザーに通知します。
ORM実装例
ほとんどのORMは、楽観的ロックの組み込みサポートを備えているか、バージョンフィールドを追加することで簡単な実装を可能にします。
# Django ORMの例(カスタムフィールドまたは`django-optimistic-lock`のようなパッケージが必要です) # 簡単のため、パッケージが使用されていない場合は手動で原則を実演します。 from django.db import transaction, models from django.db.models import F class Product(models.Model): name = models.CharField(max_length=100) stock = models.IntegerField(default=0) version = models.IntegerField(default=1) # バージョンフィールド def decrement_stock_optimistic(self, quantity: int): with transaction.atomic(): # 現在のプロダクトデータ(バージョンを含む)を取得します product = Product.objects.get(pk=self.pk) # 現在の在庫が十分かどうかを確認します if product.stock < quantity: raise ValueError("Insufficient stock.") # 可能な限りアトミックな更新のためにF()式を使用し、読み取りチェック部分の競合状態を回避しますが、 # ここでの重要な部分はsave()のバージョンチェックです。 updated_count = Product.objects.filter( pk=self.pk, version=product.version ).update( stock=F('stock') - quantity, version=F('version') + 1 ) if updated_count == 0: # 別のトランザクションがプロダクトを変更したため、更新が失敗しました raise RuntimeError("Product has been modified by another transaction. Please retry.") # 新しい在庫とバージョンを取得するために、プロダクトインスタンスをリフレッシュします self.refresh_from_db() print(f"Product {self.id} stock updated successfully to {self.stock} (version: {self.version}).") return self.stock # SQLAlchemy ORMの例 from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.orm.exc import StaleDataError from sqlalchemy.ext.declarative import declared_attr, as_declarative from sqlalchemy.orm import configure_mappers, Mapped, mapped_column # --- SQLAlchemy 2.0+ スタイル --- @as_declarative() class Base: __tablename__ = 'products' # 楽観的同時実行のためのバージョンID列を定義します。これは、 # セッションが各更新または削除でバージョンIDをチェックする必要があることを要求します。 version_id: Mapped[int] = mapped_column(Integer, default=1) class Product(Base): id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String) stock: Mapped[int] = mapped_column(Integer, default=0) def __repr__(self): return f"<Product(id={self.id}, name='{self.name}', stock={self.stock}, version={self.version_id})>" # すべてのモデル定義後に連絡先ベースにバインドします configure_mappers() engine = create_engine('sqlite:///:memory:') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) def decrement_product_stock_optimistic_sqlalchemy(product_id: int, quantity: int): session = Session() try: with session.begin(): # プロダクトを取得します。SQLAlchemyのバージョニングは、そのversion_idを追跡します product = session.query(Product).filter_by(id=product_id).first() if product: if product.stock >= quantity: product.stock -= quantity # productが保存されると、SQLAlchemyは自動的に # `WHERE version_id = <original_version_id>`を挿入し、`version_id`をインクリメントします。 # 更新が0行に影響した場合、StaleDataErrorが発生します。 session.add(product) print(f"Product {product.id} stock updated successfully to {product.stock} (version: {product.version_id}).") return True else: print(f"Insufficient stock for product {product.id}.") return False else: print(f"Product {product_id} not found.") return False except StaleDataError: session.rollback() print(f"Optimistic lock conflict for product {product_id}. Another transaction modified it. Retrying...") # 実際のアプリケーションでは、データを再取得し、変更をマージして、操作を再試行します。 return False except Exception as e: session.rollback() print(f"An error occurred: {e}") return False finally: session.close() # SQLAlchemy用のプロダクトの初期化 s = Session() p = Product(id=1, name="Widget", stock=100) s.add(p) s.commit() s.close()
SQLAlchemyの例では、version_id
列を追加し、ORMプロパティをversion_id=True
(または2.0スタイルでas_declarative
とversion_id
列を使用)で構成することにより、SQLAlchemyは更新中のバージョンチェックを自動的に処理します。データベース内のバージョンがロードされたバージョンと一致しないために更新が失敗した場合、SQLAlchemyはStaleDataError
を発生させ、アプリケーションはこれをキャッチして処理できます(例:操作を再試行する)。
アプリケーションシナリオ
- コンテンツ管理システム: 競合は少ないものの、堅牢な処理が必要な記事やドキュメントの編集。
- Eコマース製品説明: 同じ説明への同時変更が少ない製品詳細の更新。
- ユーザープロファイル更新: ユーザー設定の変更。直接的な競合はまれであることが期待されます。
- 構成管理: アプリケーション構成の更新。
結論
悲観的ロックと楽観的ロックの両方の戦略は、データベースの同時実行制御の領域において不可欠であり、それぞれが異なる利点とトレードオフを提供します。SELECT FOR UPDATE
を通じて実装される悲観的ロックは、競合を事前に防ぐことにより強力な一貫性を提供し、データ整合性が損なわれない重要な操作に適しています。バージョン番号を活用する楽観的ロックは、高い同時実行性と応答性を優先し、競合がまれで再試行が許容されるシナリオに最適です。ORMは、これらの複雑なデータベース機能をアプリケーションコードに統合することを大幅に単純化し、低レベルのSQLの詳細を抽象化して、開発者がビジネスロジックに集中できるようにします。これらの2つの戦略の選択は、特定のアプリケーションの要件、同時変更の可能性、およびデータ一貫性とシステムスループットの望ましいバランスに大きく依存します。最終的に、ORMを通じてこれらのロックメカニズムを習得することは、同時操作を安全に処理できる、信頼性が高く高性能なアプリケーションを構築するための力を与えてくれます。