バックエンドフレームワークにおけるCQRSとイベントソーシングによるスケーラブルなシステムの構築
Wenhao Wang
Dev Intern · Leapcell

はじめに
現代のソフトウェア開発の急速に進化する状況において、堅牢でスケーラブル、かつ保守性の高いアプリケーションを構築することは最優先事項です。システムが複雑化し、ユーザーの需要が増加するにつれて、従来のCRUD(Create, Read, Update, Delete)アーキテクチャは、読み書きの競合、データ整合性の課題、履歴変更の監査の困難さによって、しばしばボトルネックとなり、苦戦することがあります。そこで、Command Query Responsibility Segregation(CQRS)やEvent Sourcingといった高度なアーキテクチャパターンが強力なソリューションとして登場します。これらのパターンは、データ操作とクエリの処理方法を根本的に再定義することにより、パフォーマンスが高いだけでなく、本来的に回復力があり、洞察に富んだシステムを設計するための道を提供します。この記事では、バックエンドフレームワーク内でのCQRSとEvent Sourcingの実際的な応用について深く掘り下げ、高度なエンタープライズグレードのアプリケーション構築へのアプローチをどのように変革できるかを示します。
コアコンセプトと原則
実際の実装に入る前に、CQRSとEvent Sourcingの基盤となるコアコンセプトをしっかりと理解しておくことが重要であり、不可欠です。
Command Query Responsibility Segregation (CQRS)
CQRSは、コマンド(アプリケーションの状態を変更する操作)の処理責任と、クエリ(データの取得操作)の処理責任を分離するアーキテクチャパターンです。
- コマンドモデル(書き込み側): アプリケーションのこの部分は、コマンドの受け入れと処理、ビジネスルールの検証、および状態変更の永続化を担当します。通常、書き込み用に最適化されたデータストアを使用し、トランザクション整合性に焦点を当てることが多いです。
- クエリモデル(読み取り側): この部分は、データの読み取りに最適化されています。特定のクエリパターンに合わせて調整された非正規化されたデータストア(例:レポートデータベース、キーバリューストア、検索インデックス)を使用でき、高いパフォーマンスと柔軟性を提供します。
なぜ分離するのか? 従来のCRUDシステムは、しばしば読み書きの両方に単一のモデルを使用し、妥協を招きます。一方を最適化すると、もう一方に悪影響を与えることがよくあります。CQRSにより、各側の独立したスケーリングと最適化が可能になり、パフォーマンス、柔軟性、保守性の向上が実現します。
Event Sourcing
Event Sourcingは、集計の現在の状態を保存する代わりに、その集計に加えられたすべての変更を記述する不変イベントのシーケンスを保存する永続化パターンです。
- イベント: 過去に起こったことに関する事実を表します(例:
OrderPlacedEvent
、ProductQuantityAdjustedEvent
)。これらは不変であり、追記専用です。 - イベントストア: これらのイベントシーケンスを保存および取得するために設計された特殊なデータベースです。アプリケーションの状態の単一の真実の情報源として機能します。
- 集計: データ変更の単一の単位として扱われることができるドメインオブジェクトのクラスターです。トランザクション整合性とイベント適用の境界となります。
状態ではなくイベントを保存する理由 Event Sourcingは、すべての変更の完全で監査可能な履歴を提供し、時間移動デバッグ、任意の時点での状態再構築のためのイベントの再生、さまざまなコンシューマーに合わせて調整された複数の読み取りモデルの生成などの強力な機能を提供します。また、分散システムのための発行/サブスクライブメカニズムとも自然に統合されます。
CQRSとEvent Sourcingの関係
CQRSとEvent Sourcingは互いに補完し合うパターンです。Event Sourcingは、CQRSの書き込み側には自然に適合します。コマンドはイベントを生成し、それらはイベントストアに保存されます。これらのイベントは、非同期に公開されて1つ以上の読み取りモデルを構築および更新でき、それらがCQRSの読み取り側を形成します。この相乗効果により、強力で高度にスケーラブルで監査可能なシステムが実現します。
CQRSとEvent Sourcingの実装
ここでは、架空のeコマース製品管理サービスを例として、CQRSとEvent Sourcingの実装を説明します。概念を示すために、汎用的なバックエンドフレームワーク(例:JavaのSpring BootまたはPythonのFastAPI)を使用します。
プロジェクト構造の概要
├── src
│ ├── main
│ │ ├── java/python/...
│ │ │ └── com
│ │ │ └── example
│ │ │ └── productservice
│ │ │ ├── api // コマンドとクエリ用のRESTコントローラー
│ │ │ ├── command // コマンド定義とハンドラー
│ │ │ │ ├── model // コマンド用のDTO
│ │ │ │ └── service // コマンドハンドラーロジック
│ │ │ ├── query // クエリ定義とハンドラー
│ │ │ │ ├── model // クエリと読み取りモデル用のDTO
│ │ │ │ └── service // クエリハンドラーロジック
│ │ │ ├── domain // 集計、イベント、ビジネスロジック
│ │ │ │ ├── aggregate // ProductAggregate
│ │ │ │ ├── event // Productイベント(例:ProductCreatedEvent)
│ │ │ │ └── repository // イベントストアとのやり取り
│ │ │ ├── infrastructure // イベントストア設定、イベント発行者
│ │ │ └── config // アプリケーション設定
1. コマンドとイベントの定義
まず、Product
集計のコマンドと、それらが生成するイベントを定義しましょう。
コマンド(書き込み側の入力):
// Javaの例(コマンドDTO) public class CreateProductCommand { private String productId; private String name; private double price; private int quantity; // Getter、Setter、コンストラクタ } public class UpdateProductPriceCommand { private String productId; private double newPrice; // Getter、Setter、コンストラクタ }
# Pythonの例(Pydanticを使用するコマンドDTO) from pydantic import BaseModel class CreateProductCommand(BaseModel): product_id: str name: str price: float quantity: int class UpdateProductPriceCommand(BaseModel): product_id: str new_price: float
イベント(システムの真実):
// Javaの例(イベント) public abstract class ProductEvent { private String productId; private long timestamp; // Getter、Setter、コンストラクタ } public class ProductCreatedEvent extends ProductEvent { private String name; private double price; private int quantity; // Getter、Setter、コンストラクタ } public class ProductPriceUpdatedEvent extends ProductEvent { private double newPrice; // Getter、Setter、コンストラクタ }
# Pythonの例(Pydanticを使用するイベント) from datetime import datetime from typing import Optional class Event(BaseModel): product_id: str timestamp: datetime = datetime.utcnow() class ProductCreatedEvent(Event): name: str price: float quantity: int class ProductPriceUpdatedEvent(Event): new_price: float
2. 集計とイベントソーシングロジック(書き込み側)
ProductAggregate
は、コマンドを適用し、イベントを生成する責任を負います。これは、自身のイベントを再生して状態を再構築します。
// Javaの例(Product集計) public class ProductAggregate { private String productId; private String name; private double price; private int quantity; private long version; // 並行処理制御用 // 新しい集計を作成するためのコンストラクタ public ProductAggregate(CreateProductCommand command) { applyNewEvent(new ProductCreatedEvent(command.getProductId(), command.getName(), command.getPrice(), command.getQuantity())); } // 履歴から再構築するためのコンストラクタ public ProductAggregate(List<ProductEvent> history) { history.forEach(this::apply); } // コマンドハンドラー public void updatePrice(UpdateProductPriceCommand command) { if (command.getNewPrice() <= 0) { throw new IllegalArgumentException("価格は負またはゼロにできません。"); } applyNewEvent(new ProductPriceUpdatedEvent(this.productId, command.getNewPrice())); } // イベントに基づいて状態を変更する適用メソッド private void apply(ProductEvent event) { if (event instanceof ProductCreatedEvent) { ProductCreatedEvent e = (ProductCreatedEvent) event; this.productId = e.getProductId(); this.name = e.getName(); this.price = e.getPrice(); this.quantity = e.getQuantity(); } else if (event instanceof ProductPriceUpdatedEvent) { ProductPriceUpdatedEvent e = (ProductPriceUpdatedEvent) event; this.price = e.getNewPrice(); } this.version++; } // 新しいイベントを適用および保存するためのヘルパー private void applyNewEvent(ProductEvent event) { apply(event); // この'event'は保存され、公開される可能性があります。 // 実際のシステムでは、収集されたイベントはイベントストアに送信されます。 } // Getter }
# Pythonの例(Product集計) from typing import List, Dict, Any class ProductAggregate: def __init__(self, product_id: str): self.product_id = product_id self.name: Optional[str] = None self.price: Optional[float] = None self.quantity: Optional[int] = None self.version: int = -1 self._uncommitted_events: List[Event] = [] @classmethod def create(cls, command: CreateProductCommand) -> 'ProductAggregate': aggregate = cls(command.product_id) aggregate._apply_new_event(ProductCreatedEvent( product_id=command.product_id, name=command.name, price=command.price, quantity=command.quantity )) return aggregate @classmethod def from_history(cls, product_id: str, history: List[Event]) -> 'ProductAggregate': aggregate = cls(product_id) for event in history: aggregate._apply(event) return aggregate def update_price(self, command: UpdateProductPriceCommand): if command.new_price <= 0: raise ValueError("価格は負またはゼロにできません。") self._apply_new_event(ProductPriceUpdatedEvent( product_id=self.product_id, new_price=command.new_price )) def _apply(self, event: Event): if isinstance(event, ProductCreatedEvent): self.name = event.name self.price = event.price self.quantity = event.quantity elif isinstance(event, ProductPriceUpdatedEvent): self.price = event.new_price self.version += 1 def _apply_new_event(self, event: Event): self._apply(event) self._uncommitted_events.append(event) # 保存されるイベントを収集 def get_uncommitted_events(self) -> List[Event]: return self._uncommitted_events def clear_uncommitted_events(self): self._uncommitted_events = [] # Getter/Properties @property def current_state(self) -> Dict[str, Any]: return { "productId": self.product_id, "name": self.name, "price": self.price, "quantity": self.quantity, "version": self.version }
3. コマンドハンドラーとイベントストアのやり取り
コマンドハンドラーはプロセスをオーケストレーションします。イベントストアから集計をロードし、コマンドを適用し、新しく生成されたイベントを保存し、それらを公開します。
// Javaの例(Productコマンドハンドラー) @Service public class ProductCommandHandler { private final EventStore eventStore; private final EventPublisher eventPublisher; // 読み取りモデルのためのイベント公開用 public ProductCommandHandler(EventStore eventStore, EventPublisher eventPublisher) { this.eventStore = eventStore; this.eventPublisher = eventPublisher; } public void handle(CreateProductCommand command) { ProductAggregate aggregate = new ProductAggregate(command); List<ProductEvent> newEvents = // 集計から新しいイベントを取得するロジック eventStore.saveEvents(command.getProductId(), newEvents, aggregate.getVersion()); newEvents.forEach(eventPublisher::publish); } public void handle(UpdateProductPriceCommand command) { List<ProductEvent> history = eventStore.getEventsForAggregate(command.getProductId()); ProductAggregate aggregate = new ProductAggregate(history); // 並行処理チェック(バージョン管理) // コマンドからの期待バージョンが集計バージョンと一致しない場合、エラーをスローします // 簡単のため、ここでは最新バージョンを想定します。 aggregate.updatePrice(command); List<ProductEvent> newEvents = // 集計から新しいイベントを取得するロジック eventStore.saveEvents(command.getProductId(), newEvents, aggregate.getVersion()); newEvents.forEach(eventPublisher::publish); } } // EventStoreとEventPublisherはインフラストラクチャコンポーネントになるでしょう。EventStoreは // NoSQL DB(例:Cassandra, MongoDB)または専用イベントストア(例:EventStoreDB)かもしれません。
# Pythonの例(Productコマンドハンドラー) from typing import Protocol, List from .domain.aggregate import ProductAggregate from .infrastructure.event_store import EventStore from .infrastructure.event_publisher import EventPublisher from .domain.event import Event class ProductCommandHandler: def __init__(self, event_store: EventStore, event_publisher: EventPublisher): self.event_store = event_store self.event_publisher = event_publisher def handle_create_product(self, command: CreateProductCommand): aggregate = ProductAggregate.create(command) self.event_store.save_events(command.product_id, aggregate.get_uncommitted_events(), aggregate.version) for event in aggregate.get_uncommitted_events(): self.event_publisher.publish(event) aggregate.clear_uncommitted_events() def handle_update_product_price(self, command: UpdateProductPriceCommand): history = self.event_store.get_events_for_aggregate(command.product_id) if not history: raise ValueError(f"ID {command.product_id} の製品が見つかりません。") aggregate = ProductAggregate.from_history(command.product_id, history) # 実際のシステムでは、コマンドにexpected_versionを渡して # aggregate.versionと比較して楽観的同時実行制御を処理します。 # 簡単のため、ここでは最新バージョンを想定します。 aggregate.update_price(command) self.event_store.save_events(command.product_id, aggregate.get_uncommitted_events(), aggregate.version) for event in aggregate.get_uncommitted_events(): self.event_publisher.publish(event) aggregate.clear_uncommitted_events() # 例 EventStore(簡略化) class EventStore: def __init__(self): self._stores: Dict[str, List[Event]] = {} # デモンストレーション用のシンプルなインメモリストア def save_events(self, aggregate_id: str, events: List[Event], expected_version: int): current_events = self._stores.get(aggregate_id, []) # 基本的な楽観的同時実行チェック(本番ではより堅牢に) if current_events and len(current_events) != expected_version - len(events): raise ValueError("同時実行の競合:集計が変更されました。") current_events.extend(events) self._stores[aggregate_id] = current_events print(f"{aggregate_id} について {len(events)} 件のイベントを保存しました。合計: {len(current_events)}") def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]: return self._stores.get(aggregate_id, []) # 例 EventPublisher(簡略化) class EventPublisher: def publish(self, event: Event): print(f"イベントを公開中:{event.__class__.__name__} 製品 {event.product_id} のため") # 実際のシステムでは、メッセージブローカー(例:Kafka, RabbitMQ)にプッシュします。
4. 読み取りモデルとクエリハンドラー(読み取り側)
コマンドサイドによって公開されたイベントは、プロジェクター(またはイベントハンドラー)によって消費され、非正規化された読み取りモデルを更新します。これらの読み取りモデルは、その後クエリされます。
読み取りモデル(非正規化されたビュー):
// Javaの例(Product概要読み取りモデル) public class ProductSummary { private String productId; private String name; private double currentPrice; private int quantityOnHand; // その他の計算フィールド(例:'lastUpdated'、'totalOrders')の可能性あり // Getter、Setter、コンストラクタ }
# Pythonの例(Pydanticを使用するProduct概要読み取りモデル) class ProductSummary(BaseModel): product_id: str name: str current_price: float quantity_on_hand: int last_updated: datetime = datetime.utcnow() # 計算フィールド
イベントコンシューマー/プロジェクター:
// Javaの例(Product読み取りモデルプロジェクター) @Service public class ProductReadModelProjector { private final ProductSummaryRepository productSummaryRepository; // ProductSummaryを永続化 public ProductReadModelProjector(ProductSummaryRepository productSummaryRepository) { this.productSummaryRepository = productSummaryRepository; } @EventListener // アプリケーションイベントをリッスンするSpringの方法 public void handle(ProductCreatedEvent event) { ProductSummary summary = new ProductSummary(event.getProductId(), event.getName(), event.getPrice(), event.getQuantity()); productSummaryRepository.save(summary); } @EventListener public void handle(ProductPriceUpdatedEvent event) { ProductSummary summary = productSummaryRepository.findByProductId(event.getProductId()) .orElseThrow(() -> new RuntimeException("Product summary not found")); summary.setCurrentPrice(event.getNewPrice()); summary.setLastUpdated(event.getTimestamp()); // 最終更新時刻を更新 productSummaryRepository.save(summary); } }
# Pythonの例(Product読み取りモデルプロジェクター) from .infrastructure.read_model_db import ProductSummaryRepository from .query.model import ProductSummary class ProductReadModelProjector: def __init__(self, product_summary_repo: ProductSummaryRepository): self.product_summary_repo = product_summary_repo def handle_product_created_event(self, event: ProductCreatedEvent): summary = ProductSummary( product_id=event.product_id, name=event.name, current_price=event.price, quantity_on_hand=event.quantity, last_updated=event.timestamp ) self.product_summary_repo.save(summary) print(f"プロジェクター:製品 {summary.product_id} の概要を作成しました") def handle_product_price_updated_event(self, event: ProductPriceUpdatedEvent): summary = self.product_summary_repo.find_by_product_id(event.product_id) if not summary: raise ValueError(f"製品 {event.product_id} の概要が見つかりません。") summary.current_price = event.new_price summary.last_updated = event.timestamp self.product_summary_repo.save(summary) print(f"プロジェクター:製品 {summary.product_id} の価格を {summary.current_price} に更新しました") # 例 読み取りモデルリポジトリ(簡略化、PostgreSQLのような別のデータベースでも可) class ProductSummaryRepository: def __init__(self): self._store: Dict[str, ProductSummary] = {} def save(self, summary: ProductSummary): self._store[summary.product_id] = summary def find_by_product_id(self, product_id: str) -> Optional[ProductSummary]: return self._store.get(product_id) def find_all(self) -> List[ProductSummary]: return list(self._store.values())
クエリハンドラーとAPIエンドポイント:
// Javaの例(ProductクエリハンドラーとRESTエンドポイント) @Service public class ProductQueryHandler { private final ProductSummaryRepository productSummaryRepository; public ProductQueryHandler(ProductSummaryRepository productSummaryRepository) { this.productSummaryRepository = productSummaryRepository; } public ProductSummary getProductSummary(String productId) { return productSummaryRepository.findByProductId(productId) .orElseThrow(() -> new ProductNotFoundException(productId)); } public List<ProductSummary> getAllProductSummaries() { return productSummaryRepository.findAll(); } } @RestController @RequestMapping("/api/products") public class ProductQueryController { private final ProductQueryHandler queryHandler; public ProductQueryController(ProductQueryHandler queryHandler) { this.queryHandler = queryHandler; } @GetMapping("/{productId}") public ResponseEntity<ProductSummary> getProduct(@PathVariable String productId) { return ResponseEntity.ok(queryHandler.getProductSummary(productId)); } @GetMapping public ResponseEntity<List<ProductSummary>> getAllProducts() { return ResponseEntity.ok(queryHandler.getAllProductSummaries()); } }
# Pythonの例(ProductクエリハンドラーとFastAPIエンドポイント) from fastapi import APIRouter, HTTPException from typing import List product_query_router = APIRouter() class ProductQueryHandler: def __init__(self, product_summary_repo: ProductSummaryRepository): self.product_summary_repo = product_summary_repo def get_product_summary(self, product_id: str) -> ProductSummary: summary = self.product_summary_repo.find_by_product_id(product_id) if not summary: raise HTTPException(status_code=404, detail=f"ID {product_id} の製品が見つかりません。") return summary def get_all_product_summaries(self) -> List[ProductSummary]: return self.product_summary_repo.find_all() # FastAPI エンドポイント設定(ハンドラーのアプリコンテキスト/依存性注入を想定) @product_query_router.get("/{product_id}", response_model=ProductSummary) async def get_product( product_id: str, query_handler: ProductQueryHandler # FastAPIのDepends経由で注入 ): return query_handler.get_product_summary(product_id) @product_query_router.get("/", response_model=List[ProductSummary]) async def get_all_products( query_handler: ProductQueryHandler ): return query_handler.get_all_product_summaries() # メインアプリにて: # app = FastAPI() # app.include_router(product_query_router, prefix="/api/products")
アプリケーションシナリオ
CQRSとEvent Sourcingは、特に以下のような場面に適しています:
- 複雑なドメインモデル: ビジネスルールが複雑で、状態変更の監査が必要な場合。
- 高パフォーマンスの読み書きシステム: 読み取りと書き込みのパターンが著しく異なるシステムで、独立したスケーリングが可能。
- 監査性とコンプライアンス: すべての変更がイベントとして記録され、完全な履歴が監査目的で提供される。
- 履歴分析とビジネスインテリジェンス: イベントを再生したり、さまざまな分析モデルに変換したりできる。
- マイクロサービスアーキテクチャ: イベントは、サービス間の通信と最終的な整合性を自然に促進します。
- リアルタイムダッシュボードとプロジェクション: イベントは、リアルタイムビューとレポートを更新できます。
結論
CQRSとEvent Sourcingは強力なアーキテクチャパターンであり、慎重に適用すると、高いスケーラビリティ、回復力、保守性を備えたバックエンドシステムを構築できます。コマンド処理とクエリ処理の関心を明確に分離し、変更を不変イベントのシーケンスとして永続化することにより、開発者は優れたパフォーマンス、包括的な監査証跡、および進化するビジネス要件に対する比類なき柔軟性を提供するアプリケーションを構築できます。複雑さを導入するものの、特定のドメインにおける長期的なメリットが、初期の学習曲線よりもしばしば上回ります。これにより、チームは真に堅牢で洞察に富んだソフトウェアを構築できるようになります。これらのパターンは、現在の状態を保存するという考え方から、それを生み出した変更の因果シーケンスを理解するという考え方へと、私たちの視点を根本的にシフトさせます。