Kafka를 활용한 Pub 방식을 찾아보던 중, 많은 테크 기업들이 CDC를 통해 이벤트를 Pub 하는 방식을 사용한다는 것을 알게 되었고, CDC란 무엇인지와 현재는 비즈니스에서 이벤트를 직접 Pub 하는 방식으로 구현되어 있는데 CDC를 통한 Pub 방식의 차이점은 무엇인지 간단하게 로컬에서 구현해보려 합니다.
사례
https://tech.kakaopay.com/post/kakaopaysec-mongodb-cdc/
https://techblog.woowahan.com/10000/
https://toss.tech/article/cdc_pipeline
https://deview.kr/2021/sessions/495
CDC(Change Data Capture)란
데이터베이스에서 발생하는 데이터 변경 사항을 캡처하여 실시간으로 외부 시스템에 전달하는 기술입니다.
활용 방안.
- 데이터 동기화, 마이그레이션, 데이터 파이프라인, 이벤트 소싱, 캐시 갱신 등
CDC를 구현하는 방법 3가지
- Pull 기반
- 데이터베이스를 주기적으로 조회하여 변경 사항을 확인하는 방법.
- 실시간 CDC가 아님.
- 모든 테이블에 변경 표시를 추가해야 함.
- DB에 많은 쿼리를 실행해야 하므로 성능에 영향을 줄 수 있음.
- 삭제 이벤트의 경우 발생이 어려움.
- Push 기반
- DB에서 트리거를 사용하여 변경 사항이 발생할 때 외부 시스템으로 직접 푸시하는 방법.
- 실시간 데이터 동기화가 가능하며, 변경 사항을 즉각적으로 처리할 수 있음.
- DB에 추가적인 로직이 필요하며, 데이터베이스 높은 부하가 발생할 수 있음.
- Log 기반
- 데이터베이스의 트랜잭션 로그를 읽어 변경 사항을 확인하는 방법.
- 트랜잭션 로그를 실시간으로 모니터링하고, 변경 사항을 즉시 반영해 실시간 데이터 동기화 가능.
- DB에 최소한의 영향을 줌.
- CDC 설정이 복잡함. (Debezium과 같은 CDC 도구가 이 방식을 사용함.)
직접 Pub 방식(비즈니스 로직에서 Kafka에 Pub 하는 방식)
장점
- 이벤트 생성과 Pub을 애플리케이션에서 직접 제어할 수 있어, Pub 시점과 핸들링이 쉬움.
- 비즈니스 요구사항에 따라 이벤트 데이터의 구조를 동적으로 생성하거나, 이벤트 발생 조건을 세부적으로 정의할 수 있음.
단점
- Pub 애플리케이션 장애나 문제 발생 시 이벤트 중복 발생 가능성
- 트랜잭션 일관성 문제.
- 비즈니스 로직이 Pub 호출 로직과 결합되어 있어 시스템 간 결합도가 증가하고 코드에 유지보수도 어려움.
CDC 사용 방식.
장점
- 데이터베이스 트랜잭션 로그를 기반으로 이벤트를 생성하여 트랜잭션 일관성이 보장됨.
- 비즈니스 로직과 분리되어 있어 시스템 간 결합도가 감소.
- 코드 복잡성과 유지보수가 쉬움.
단점
- CDC 시스템을 설정하고 관리하는데 추가적인 리소스와 시간이 필요함. (Debezium)
- 로그를 읽는 방식은 실시간 처리가 필요한 부분에서 약간의 지연이 발생할 수 있음.
- 변경 감지가 필요 없는 불필요한 이벤트도 생성됨.
- 추가 인프라 비용 발생.
직접 Pub 방식과 CDC 방식의 비교
직관성 | 이벤트 발생 시점을 명확히 제어 가능 | 데이터베이스 변경을 기반으로 자동 이벤트 생성 |
트랜잭션 일관성 | 트랜잭션과 이벤트 간 불일치 가능성 존재 | 데이터베이스 트랜잭션과 일치 |
유지보수 | 비즈니스 로직 변경 시 Pub 로직도 수정 필요 | Pub 로직이 분리되어 독립적 관리 가능 |
복잡도 | 비교적 단순하지만 코드 중복 가능성 있음 | 설정 및 관리 복잡하지만 구조적으로 깔끔 |
성능 | 실시간 이벤트 Pub 가능 | 로그 기반으로 약간의 지연 발생 가능 |
적용 범위 | 비즈니스 로직에서 정의된 이벤트만 Pub | 모든 데이터 변경 사항을 Pub |
CDC 구현해 보기
https://debezium.io/documentation/reference/stable/operations/debezium-ui.html
설치
PostgreSQL database
version: '3.8'
services:
postgres:
image: postgres:latest
container_name: postgres
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
POSTGRES_DB: cdc_db
ports:
- "5432:5432"
volumes:
- /home/lim_gwangmin/postgresql/data:/var/lib/postgresql/data
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=1"
- "-c"
- "max_wal_senders=1"
volumes:
postgres_data:
Kafka Cluster
version: '3'
services:
zookeeper:
image: zookeeper:3.7
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:7.0.0
container_name: kafka1
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka2:
image: confluentinc/cp-kafka:7.0.0
container_name: kafka2
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka3:
image: confluentinc/cp-kafka:7.0.0
container_name: kafka3
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka3/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
restart: always
ports:
- "8089:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19091,kafka2:19092,kafka3:19093
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181
depends_on:
- kafka1
- kafka2
- kafka3
Debezium(Kafka Connector) + Debezium UI
version: '3'
networks:
kafka_default:
external: true
services:
debezium:
image: debezium/connect:2.0
container_name: debezium
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=debezium_config
- OFFSET_STORAGE_TOPIC=debezium_offset
- BOOTSTRAP_SERVERS=kafka1:19091,kafka2:19092,kafka3:19093
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_REST_PORT=8083
ports:
- "8083:8083"
networks:
- kafka_default
debezium-ui:
image: debezium/debezium-ui:latest
container_name: debezium-ui
environment:
- KAFKA_CONNECT_URIS=http://debezium:8083
ports:
- "8080:8080"
depends_on:
- debezium
networks:
- kafka_default
Debezium (Kafka Connector) 연결
옵션
- Filter definition: 정규식을 사용해 CDC 대상 항목을 포함/제외하는 필터를 설정. 필터를 입력하면 포함된 항목이 표시됨.
- Transformations : 메시지를 하나씩 수정하는 기능(SMT). Kafka로 전송되기 전에 레코드를 변환.
- Topic creation: 자동 토픽 생성을 위한 기본 속성과 그룹 설정.
- Data options: 스냅샷 및 매핑 설정(선택 사항). 기본값을 확인하고 변경 가능.
- Runtime options: 엔진 및 하트비트 설정(선택 사항). 기본값을 확인하고 변경 가능.
- Custom properties: database. 또는 database.history.와 같은 추가 속성 직접 입력 가능.
CDC - DB 연결
DB 쿼리 실행
INSERT INTO cdc_test.test_table (name, description) VALUES
('Item 1', 'Description for item 1'),
('Item 2', 'Description for item 2');
Kafka 메시지 Pub 확인
UPDATE cdc_test.test_table SET
description = 'Updated description for item 1'
WHERE id = 1;
위와 같이 CDC로 구축 시에 단일 테이블 데이터뿐만 아니라 여러 테이블의 데이터를 조합해서 이벤트 처리를 해야 하는 경우에는 어떻게 적용할 수 있을까 찾아보았는데 아래와 같은 방법으로 처리할 수 있습니다.
A 테이블 → topic-a
B 테이블 → topic-b
해당 이벤트를 buffer에 PK, FK로 묶어서 buffer에 담아 topic-c로 Pub 하는 방식으로 사용 가능.
@Service
public class CDCEventProcessor {
private final Map<String, String> buffer = new ConcurrentHashMap<>();
@KafkaListener(topics = "topic-a", groupId = "cdc-group")
public void handleATableEvent(String message) {
// A 테이블 이벤트 수신
Event eventA = parseMessage(message);
String bValue = buffer.get(eventA.getKey()); // B 상태 조회
if (bValue != null) {
publishToCTable(eventA.getValue(), bValue); // C 이벤트 생성 및 퍼블리시
} else {
buffer.put(eventA.getKey(), eventA.getValue());
}
}
@KafkaListener(topics = "topic-b", groupId = "cdc-group")
public void handleBTableEvent(String message) {
// B 테이블 이벤트 수신
Event eventB = parseMessage(message);
String aValue = buffer.get(eventB.getKey()); // A 상태 조회
if (aValue != null) {
publishToCTable(aValue, eventB.getValue()); // C 이벤트 생성 및 퍼블리시
} else {
buffer.put(eventB.getKey(), eventB.getValue());
}
}
private void publishToCTable(String aValue, String bValue) {
// C 테이블 이벤트 생성 및 Kafka 퍼블리싱
CEvent cEvent = new CEvent(aValue, bValue);
kafkaTemplate.send("topic-c", cEvent.toJson());
}
}
이번에 자사에서 Kafka를 적용하는 프로젝트를 진행했었습니다. 저는 직접 비즈니스 로직에서 이벤트를 발행하는 방식으로 구현했는데 이 방식은 애플리케이션 레벨에서 이벤트의 흐름을 명확히 제어할 수 있다는 장점과 비즈니스 요구사항에 따라 세부적인 조정이 가능하고, 이벤트 발행의 타이밍과 내용을 원하는 대로 설계할 수 있었습니다.
그러나 구현 과정에서 이벤트 발행과 데이터 저장 간의 트랜잭션 일관성을 보장하기 어려웠고, 일관성을 맞춰주기 위해 코드 복잡성이 증가하는 단점이 있었습니다.
반면 CDC 방식은 데이터베이스 변경 사항을 직접 Pub 해주기 때문에 비즈니스 로직과는 분리된 방식으로 동작합니다.
이를 통해 애플리케이션 코드의 변경 없이도 데이터 변경 이벤트를 자동으로 처리할 수 있고, 일관성과 실시간 처리를 더 쉽게 구현할 수 있다는 걸 알게 되었습니다.
만약 제가 이번 프로젝트에서 CDC 방식을 알았다면 더 적은 노력으로 구현을 완료할 수 있었을 것 같습니다.
하지만 CDC는 설정과 운영에서 추가적인 관리가 필요하기 때문에, 사용 전 학습과 테스트가 많이 필요할 것 같습니다.
이번 프로젝트를 진행하면서 사전 조사를 통해 다양한 접근 방식을 학습하고, 프로젝트 요구사항과 사용 가능한 리소스를 기반으로 적절한 방식을 선택하는 것이 중요하다는 것을 깨달았습니다.
'MQ' 카테고리의 다른 글
Consumer Lag 이란 (1) | 2024.12.12 |
---|---|
MSA 환경 트랜잭션 (1) | 2024.11.28 |
Kafka 공통 메시지 포맷을 위한 제네릭 적용하기 (0) | 2024.09.12 |
Kafka Pub CircuitBreaker 적용하기 (0) | 2024.09.07 |
Kafka Pub 동적 Batch 관리 (0) | 2024.08.18 |