CDC(Change Data Capture)는 데이터베이스에서 발생하는 삽입, 수정, 삭제 변경 사항을 실시간으로 캡처하여 다른 시스템에 전파하는 기술이다.
CDC 방식 비교
| 방식 | 설명 | 장점 | 단점 |
|---|
| 타임스탬프 기반 | updated_at 컬럼 폴링 | 단순 | 삭제 감지 불가, 지연 |
| 트리거 기반 | DB 트리거로 변경 기록 | 정확 | DB 부하 |
| 로그 기반 | WAL/binlog 읽기 | 부하 없음, 완전한 변경 감지 | 구현 복잡 |
Debezium: 로그 기반 CDC
yaml
# Debezium PostgreSQL Connector
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "password",
"database.dbname": "analytics",
"database.server.name": "myserver",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"snapshot.mode": "initial"
}
}
sql
-- postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- 복제 사용자 생성
CREATE USER replicator REPLICATION LOGIN PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;
-- 복제 슬롯 생성
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
Debezium 변경 이벤트
json
{
"before": null,
"after": {
"order_id": "ORD-001",
"customer_id": 12345,
"amount": 99.99,
"status": "placed"
},
"op": "c",
"ts_ms": 1704067200000,
"source": {
"db": "analytics",
"table": "orders",
"lsn": 12345678
}
}
CDC → Kafka → Sink 파이프라인
PostgreSQL WAL
↓
Debezium Connector
↓
Kafka Topic (myserver.public.orders)
↓
Kafka Connect Sink
↓
Elasticsearch / Snowflake / S3
Flink CDC
sql
-- Flink SQL로 CDC 직접 읽기
CREATE TABLE orders_cdc (
order_id STRING,
amount DECIMAL(12,2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'database-name' = 'analytics',
'table-name' = 'orders'
);