変更データキャプチャと論理デコーディングでイベント駆動型アーキテクチャを推進するリアルタイムデータストリーム
Min-jun Kim
Dev Intern · Leapcell

今日のペースの速いデジタルの状況では、アプリケーションは分散システム全体で応答性とデータの一貫性をますます要求しています。従来のバッチ処理やポーリングメカニズムは、しばしば不十分であり、レイテンシとデータベースへの不要な負荷をもたらします。ここで、イベント駆動型アーキテクチャの概念が輝きを放ち、アプリケーションが基盤となるデータストア内で発生する変更にリアルタイムで対応できるようにします。しかし、課題は、これらの変更をデータベースから効率的かつ確実にキャプチャすることにあります。この記事では、変更データキャプチャ (CDC)、特にDebeziumや論理デコーディングなどのツールを活用して、この問題に対するエレガントで強力なソリューションを提供し、イベント駆動型アーキテクチャのリアルタイム機能を真に推進する方法を掘り下げます。このパターンを理解することは、重要なビジネスイベントに即座に対応できる、スケーラブルで回復力があり、非常に応答性の高い最新アプリケーションを構築するために不可欠です。
変更データキャプチャとそのドライバーの理解
メカニズムに飛び込む前に、関係するコアコンセプトを明確に理解しましょう。
主要な用語
- 変更データキャプチャ (CDC): データベース内で変更されたデータを特定および追跡するために使用される一連のソフトウェア設計パターン。CDCは、テーブル全体を反復的にクエリするのではなく、変更(挿入、更新、削除)のみをキャプチャすることに焦点を当てています。
- 論理デコーディング: PostgreSQL固有の機能であり、外部システムがデータベースの書き込み先ログ(WAL)からデコードされた人間が読める変更ストリームをストリーミングできるようにします。これは、低レベルのWALエントリを論理操作(例:「INSERT INTO users VALUES (1, 'Alice')」、「UPDATE products SET price = 20 WHERE id = 10'」)に基本的に変換します。他のデータベースには、MySQLのバイナリログ(binlog)やSQL Serverの変更追跡/変更データキャプチャのような同様のメカニズムがあります。
- Debezium: CDCのためのオープンソース分散プラットフォーム。Debeziumは、行レベルの変更を監視するために特定のデータベース管理システム(PostgreSQL、MySQL、MongoDB、SQL Server、Oracleなど)を監視するコネクタのセットを提供します。Debeziumはこれらの変更をイベントとしてメッセージブローカー(通常はApache Kafka)にストリーミングし、他のアプリケーションが利用できるようにします。
- イベント駆動型アーキテクチャ (EDA): イベントの生成、検出、消費、およびそれらへの応答を中心に据えたソフトウェアアーキテクチャパラダイム。イベントは、システム内での重要な発生(例:「UserRegistered」、「OrderPlaced」、「ProductPriceUpdated」)を表します。
- 書き込み先ログ (WAL) / トランザクションログ / リドゥログ: リレーショナルデータベースの不可欠なコンポーネントであり、ディスクへの永続的な書き込み前にデータベースに対して行われたすべての変更を記録します。主にクラッシュリカバリとレプリケーションに使用されます。CDCメカニズムは、これらのログに基づいて構築されることがよくあります。
論理デコーディングによるCDCの原則
Debeziumと論理デコーディングをCDCに使用する基本的な原則は、データベース自体のトランザクションログを活用することです。イベントを生成するためにアプリケーションコードを変更したり、トリガー(オーバーヘッドや複雑さをもたらす可能性がある)を使用したりする代わりに、Debeziumは最適化され信頼性の高いトランザクションログに「接続」します。
プロセスを分解すると次のようになります。
- データベースの変更: データベース(例:PostgreSQL)に対して行われた
INSERT
、UPDATE
、またはDELETE
操作は、まず書き込み先ログ(WAL)に記録されます。 - 論理デコーディングプラグイン: 論理デコーディング機能を持つPostgreSQLは、設定されたプラグイン(
pgoutput
やwal2json
など)がバイナリWALエントリを解釈して構造化された論理形式にデコードするメカニズムを提供します。 - Debeziumコネクタ: Debeziumコネクタ(例:
PostgreSQLConnector
)はデータベースに接続し、論理デコーディングストリームのクライアントとして機能します。デコードされた変更を継続的に読み取ります。 - イベント変換: Debeziumは、これらの生のデータベース変更イベントを標準化されたイベント形式(通常はJSONまたはAvroで、
before
とafter
の状態、操作タイプ、タイムスタンプなどを含む構造化スキーマに従います)に変換します。 - イベントストリーミング: Debeziumは、これらの構造化された変更イベントをメッセージブローカー、最も一般的にはApache Kafkaに公開します。各テーブルの変更は、特定のKafkaトピック(例:
dbserver.public.users
)に対応することがあります。 - イベント消費: ダウンストリームのマイクロサービスまたはアプリケーションは、関連するKafkaトピックをサブスクライブします。新しいイベントが到着すると、ソースデータベースを直接クエリすることなく、データ変更にリアルタイムで対応できます。
DebeziumとKafkaを使用した実装例
PostgreSQLデータベース、Kafkaブローカー(Zookeeper付き)、およびDebeziumコネクタを起動するためのDocker Composeを使用した簡単な例でこれを説明しましょう。
1. docker-compose.yml
:
version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.4.0 container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:7.4.0 container_name: kafka ports: - "9092:9092" - "9093:9093" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 postgres: image: debezium/postgres:16 container_name: postgres ports: - "5432:5432" environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: mydatabase healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 5s timeout: 5s retries: 5 # Configure WAL level for logical decoding command: ["-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10"] connect: image: debezium/connect:2.4 container_name: connect ports: - "8083:8083" depends_on: - kafka - postgres environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: connect_configs OFFSET_STORAGE_TOPIC: connect_offsets STATUS_STORAGE_TOPIC: connect_status
2. スタックのデプロイ:
docker-compose up -d
3. Debezium PostgreSQLコネクタの設定:
すべてのサービスが起動したら(1~2分待つ)、Debezium PostgreSQLコネクタをデプロイします。
connector-config.json
という名前のファイルを作成します。
{ "name": "postgres-cdc-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "mydatabase", "database.server.name": "dbserver", "plugin.name": "pgoutput", "table.include.list": "public.users", "topic.prefix": "dbserver_cdc", "heartbeat.interval.ms": "5000", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes-dbserver" } }
次に、このコネクタ設定をDebezium Connectに送信します。
curl -X POST -H "Content-Type: application/json" --data @connector-config.json http://localhost:8083/connectors
コネクタが作成されたことを示す応答が表示されるはずです。
4. データベースとの対話:
PostgreSQLデータベースに接続します。
docker exec -it postgres psql -U postgres -d mydatabase
テーブルを作成してデータを挿入/更新します。
CREATE TABLE users ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, email VARCHAR(255) UNIQUE ); INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com'); UPDATE users SET email = 'alice.smith@example.com' WHERE name = 'Alice'; INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com'); DELETE FROM users WHERE name = 'Bob';
psqlを終了します。
5. Kafkaイベントの監視:
これで、Kafkaコンソールコンシューマーを使用してCDCイベントを確認できます。トピックは、topic.prefix
とtable.include.list
に基づいてdbserver_cdc.public.users
になります。
docker exec -it kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic dbserver_cdc.public.users --from-beginning --property print.key=true
行のbefore
とafter
の状態を含む、挿入、更新、削除を表すJSONメッセージが表示されます。この変更のリアルタイムストリームは、任意のダウンストリームサービスで利用できるようになりました。
イベント駆動型アーキテクチャでのアプリケーション
このCDCストリームが確立されると、イベント駆動型アーキテクチャの可能性は広大です。
- リアルタイム分析: データウェアハウスまたはデータレイクをリアルタイムで投入します。
- キャッシュ無効化: ソースデータが変更されたら、キャッシュを即座に無効化または更新します。
- 検索インデックス: プライマリデータベースと検索インデックス(例:Elasticsearch)を同期させ続けます。
- マイクロサービス統合: サービス間で直接的なデータベースの結合なしに、他のサービスからのデータ変更に反応できるようにします。たとえば、「注文フルフィルメント」サービスは、「注文管理」サービスからの
OrderPlaced
イベント(orders
テーブルの挿入から派生)に反応できます。 - 監査とコンプライアンス: すべてのデータベース変更の不変の監査証跡を構築します。
- データ同期: 異なるデータベースタイプまたはクラウドプロバイダー間でデータを同期します。
結論
データベースの変更を効果的にキャプチャすることは、最新の、リアクティブでスケーラブルなイベント駆動型アーキテクチャを構築するための基盤です。Debeziumのような堅牢なツールとPostgreSQLの論理デコーディングのようなデータベースの組み込み機能を利用することで、組織はデータパイプラインをバッチ指向からリアルタイムストリームに変換できます。このアプローチは、レイテンシを最小限に抑え、データベースの負荷を軽減し、サービスを疎結合化し、アプリケーションがデータ変更に即座に対応できるようにすることで、応答性とデータの一貫性の新たなレベルを解き放ちます。最終的に、Debeziumと論理デコーディングは、受動的なデータベース変更を能動的で実行可能なビジネスイベントに変換し、真のイベント駆動型システムを推進するための重要な橋渡しを提供します。