본문으로 바로가기

CDC (Change Data Capture) 구현해보기

category MQ 2024. 12. 20. 07:38
반응형

https://rockset.com/blog/change-data-capture-what-it-is-and-how-to-use-it


Kafka를 활용한 Pub 방식을 찾아보던 중, 많은 테크 기업들이 CDC를 통해 이벤트를 Pub 하는 방식을 사용한다는 것을 알게 되었고, CDC란 무엇인지와 현재는 비즈니스에서 이벤트를 직접 Pub 하는 방식으로 구현되어 있는데 CDC를 통한 Pub 방식의 차이점은 무엇인지 간단하게 로컬에서 구현해보려 합니다.

 

사례

https://tech.kakaopay.com/post/kakaopaysec-mongodb-cdc/

 

Oracle에서 MongoDB로의 CDC Pipeline 구축 | 카카오페이 기술 블로그

Oracle에서 MongoDB로의 초기 데이터 이관 및 CDC Pipeline 구축 경험을 공유합니다.

tech.kakaopay.com

https://techblog.woowahan.com/10000/

 

CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기) | 우아한형제들 기술블로그

"어 이거 CDC 적용하면 딱이겠는데요? 한번 CDC로 해보면 어때요?" B2B 알림서비스 기획 리뷰 도중 제안받은 의견입니다. 저는 이때까지만 해도 CDC가 무엇인지 잘 모르는 상태였지만, 저 의견 덕분

techblog.woowahan.com

https://toss.tech/article/cdc_pipeline

 

대규모 CDC Pipeline 운영을 위한 Debezium 개선 여정

문제 없이 여러 데이터들을 CDC를 통해 제공하던 어느 날, 근본적인 질문이 떠오릅니다. 우리의 CDC는 얼마나 잘 운영되고 있는가? CDC가 잘 운영되고 있다는걸 우리는 어떻게 믿을 수 있을까?

toss.tech

https://deview.kr/2021/sessions/495

 

DEVIEW 2023

DEVIEW는 국내외 개발자들이 서로의 지식을 나누고, 탁월함을 추구하며, 함께 성장하는 컨퍼런스 입니다.

deview.kr

 

CDC(Change Data Capture)란

데이터베이스에서 발생하는 데이터 변경 사항을 캡처하여 실시간으로 외부 시스템에 전달하는 기술입니다.

활용 방안.

  • 데이터 동기화, 마이그레이션, 데이터 파이프라인, 이벤트 소싱, 캐시 갱신 등

 

CDC를 구현하는 방법 3가지

  1. Pull 기반
    • 데이터베이스를 주기적으로 조회하여 변경 사항을 확인하는 방법.
    • 실시간 CDC가 아님.
    • 모든 테이블에 변경 표시를 추가해야 함.
    • DB에 많은 쿼리를 실행해야 하므로 성능에 영향을 줄 수 있음.
    • 삭제 이벤트의 경우 발생이 어려움.
  2. Push 기반
    • DB에서 트리거를 사용하여 변경 사항이 발생할 때 외부 시스템으로 직접 푸시하는 방법.
    • 실시간 데이터 동기화가 가능하며, 변경 사항을 즉각적으로 처리할 수 있음.
    • DB에 추가적인 로직이 필요하며, 데이터베이스 높은 부하가 발생할 수 있음.
  3. Log 기반
    • 데이터베이스의 트랜잭션 로그를 읽어 변경 사항을 확인하는 방법.
    • 트랜잭션 로그를 실시간으로 모니터링하고, 변경 사항을 즉시 반영해 실시간 데이터 동기화 가능.
    • DB에 최소한의 영향을 줌.
    • CDC 설정이 복잡함. (Debezium과 같은 CDC 도구가 이 방식을 사용함.)

 

직접 Pub 방식(비즈니스 로직에서 Kafka에 Pub 하는 방식)

장점

  • 이벤트 생성과 Pub을 애플리케이션에서 직접 제어할 수 있어, Pub 시점과 핸들링이 쉬움.
  • 비즈니스 요구사항에 따라 이벤트 데이터의 구조를 동적으로 생성하거나, 이벤트 발생 조건을 세부적으로 정의할 수 있음.

단점

  • Pub 애플리케이션 장애나 문제 발생 시 이벤트 중복 발생 가능성
  • 트랜잭션 일관성 문제.
  • 비즈니스 로직이 Pub 호출 로직과 결합되어 있어 시스템 간 결합도가 증가하고 코드에 유지보수도 어려움.

CDC 사용 방식.

장점

  • 데이터베이스 트랜잭션 로그를 기반으로 이벤트를 생성하여 트랜잭션 일관성이 보장됨.
  • 비즈니스 로직과 분리되어 있어 시스템 간 결합도가 감소.
  • 코드 복잡성과 유지보수가 쉬움.

단점

  • CDC 시스템을 설정하고 관리하는데 추가적인 리소스와 시간이 필요함. (Debezium)
  • 로그를 읽는 방식은 실시간 처리가 필요한 부분에서 약간의 지연이 발생할 수 있음.
  • 변경 감지가 필요 없는 불필요한 이벤트도 생성됨.
  • 추가 인프라 비용 발생.

직접 Pub 방식과 CDC 방식의 비교

직관성 이벤트 발생 시점을 명확히 제어 가능 데이터베이스 변경을 기반으로 자동 이벤트 생성
트랜잭션 일관성 트랜잭션과 이벤트 간 불일치 가능성 존재 데이터베이스 트랜잭션과 일치
유지보수 비즈니스 로직 변경 시 Pub 로직도 수정 필요 Pub 로직이 분리되어 독립적 관리 가능
복잡도 비교적 단순하지만 코드 중복 가능성 있음 설정 및 관리 복잡하지만 구조적으로 깔끔
성능 실시간 이벤트 Pub 가능 로그 기반으로 약간의 지연 발생 가능
적용 범위 비즈니스 로직에서 정의된 이벤트만 Pub 모든 데이터 변경 사항을 Pub

CDC 구현해 보기

https://debezium.io/documentation/reference/stable/operations/debezium-ui.html

빨간색 부분만 구현

설치

 

PostgreSQL database

version: '3.8'
services:
  postgres:
    image: postgres:latest
    container_name: postgres
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin
      POSTGRES_DB: cdc_db
    ports:
      - "5432:5432"
    volumes:
      - /home/lim_gwangmin/postgresql/data:/var/lib/postgresql/data
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=1"
      - "-c"
      - "max_wal_senders=1"

volumes:
  postgres_data:

 

Kafka Cluster

version: '3'
services:
  zookeeper:
    image: zookeeper:3.7
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
  kafka1:
    image: confluentinc/cp-kafka:7.0.0
    container_name: kafka1
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka2:
    image: confluentinc/cp-kafka:7.0.0
    container_name: kafka2
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka3:
    image: confluentinc/cp-kafka:7.0.0
    container_name: kafka3
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    restart: always
    ports:
      - "8089:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19091,kafka2:19092,kafka3:19093
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181
    depends_on:
      - kafka1
      - kafka2
      - kafka3

 

Debezium(Kafka Connector)  + Debezium UI

version: '3'
networks:
  kafka_default:
    external: true

services:
  debezium:
    image: debezium/connect:2.0
    container_name: debezium
    environment:
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=debezium_config
      - OFFSET_STORAGE_TOPIC=debezium_offset
      - BOOTSTRAP_SERVERS=kafka1:19091,kafka2:19092,kafka3:19093
      - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
      - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
      - CONNECT_REST_PORT=8083
    ports:
      - "8083:8083"
    networks:
      - kafka_default

  debezium-ui:
    image: debezium/debezium-ui:latest
    container_name: debezium-ui
    environment:
      - KAFKA_CONNECT_URIS=http://debezium:8083
    ports:
      - "8080:8080"
    depends_on:
      - debezium
    networks:
      - kafka_default

 

Debezium (Kafka Connector) 연결

옵션

  • Filter definition: 정규식을 사용해 CDC 대상 항목을 포함/제외하는 필터를 설정. 필터를 입력하면 포함된 항목이 표시됨.
  • Transformations : 메시지를 하나씩 수정하는 기능(SMT). Kafka로 전송되기 전에 레코드를 변환.
  • Topic creation: 자동 토픽 생성을 위한 기본 속성과 그룹 설정.
  • Data options: 스냅샷 및 매핑 설정(선택 사항). 기본값을 확인하고 변경 가능.
  • Runtime options: 엔진 및 하트비트 설정(선택 사항). 기본값을 확인하고 변경 가능.
  • Custom properties: database. 또는 database.history.와 같은 추가 속성 직접 입력 가능.

 

CDC - DB 연결

 

 

 

 

 


DB 쿼리 실행

INSERT INTO cdc_test.test_table (name, description) VALUES
('Item 1', 'Description for item 1'),
('Item 2', 'Description for item 2');

 

Kafka 메시지 Pub 확인

 

UPDATE cdc_test.test_table SET 
	description = 'Updated description for item 1' 
WHERE id = 1;

 

 


 

위와 같이 CDC로 구축 시에 단일 테이블 데이터뿐만 아니라 여러 테이블의 데이터를 조합해서 이벤트 처리를 해야 하는 경우에는 어떻게 적용할 수 있을까 찾아보았는데 아래와 같은 방법으로 처리할 수 있습니다.

A 테이블 → topic-a

B 테이블 → topic-b

해당 이벤트를 buffer에 PK, FK로 묶어서 buffer에 담아 topic-c로 Pub 하는 방식으로 사용 가능.

 

@Service
public class CDCEventProcessor {

    private final Map<String, String> buffer = new ConcurrentHashMap<>();

    @KafkaListener(topics = "topic-a", groupId = "cdc-group")
    public void handleATableEvent(String message) {
        // A 테이블 이벤트 수신
        Event eventA = parseMessage(message);
        String bValue = buffer.get(eventA.getKey()); // B 상태 조회
        if (bValue != null) {
            publishToCTable(eventA.getValue(), bValue); // C 이벤트 생성 및 퍼블리시
        } else {
            buffer.put(eventA.getKey(), eventA.getValue());
        }
    }

    @KafkaListener(topics = "topic-b", groupId = "cdc-group")
    public void handleBTableEvent(String message) {
        // B 테이블 이벤트 수신
        Event eventB = parseMessage(message);
        String aValue = buffer.get(eventB.getKey()); // A 상태 조회
        if (aValue != null) {
            publishToCTable(aValue, eventB.getValue()); // C 이벤트 생성 및 퍼블리시
        } else {
            buffer.put(eventB.getKey(), eventB.getValue());
        }
    }

    private void publishToCTable(String aValue, String bValue) {
        // C 테이블 이벤트 생성 및 Kafka 퍼블리싱
        CEvent cEvent = new CEvent(aValue, bValue);
        kafkaTemplate.send("topic-c", cEvent.toJson());
    }
}

 


이번에 자사에서 Kafka를 적용하는 프로젝트를 진행했었습니다. 저는 직접 비즈니스 로직에서 이벤트를 발행하는 방식으로 구현했는데 이 방식은 애플리케이션 레벨에서 이벤트의 흐름을 명확히 제어할 수 있다는 장점과 비즈니스 요구사항에 따라 세부적인 조정이 가능하고, 이벤트 발행의 타이밍과 내용을 원하는 대로 설계할 수 있었습니다.

그러나 구현 과정에서 이벤트 발행과 데이터 저장 간의 트랜잭션 일관성을 보장하기 어려웠고, 일관성을 맞춰주기 위해 코드 복잡성이 증가하는 단점이 있었습니다.

 

반면 CDC 방식은 데이터베이스 변경 사항을 직접 Pub 해주기 때문에 비즈니스 로직과는 분리된 방식으로 동작합니다.

이를 통해 애플리케이션 코드의 변경 없이도 데이터 변경 이벤트를 자동으로 처리할 수 있고, 일관성과 실시간 처리를 더 쉽게 구현할 수 있다는 걸 알게 되었습니다. 

 

만약 제가 이번 프로젝트에서 CDC 방식을 알았다면 더 적은 노력으로 구현을 완료할 수 있었을 것 같습니다.

하지만 CDC는 설정과 운영에서 추가적인 관리가 필요하기 때문에, 사용 전 학습과 테스트가 많이 필요할 것 같습니다.

 

이번 프로젝트를 진행하면서 사전 조사를 통해 다양한 접근 방식을 학습하고, 프로젝트 요구사항과 사용 가능한 리소스를 기반으로 적절한 방식을 선택하는 것이 중요하다는 것을 깨달았습니다.

 

반응형

'MQ' 카테고리의 다른 글

Consumer Lag 이란  (1) 2024.12.12
MSA 환경 트랜잭션  (1) 2024.11.28
Kafka 공통 메시지 포맷을 위한 제네릭 적용하기  (0) 2024.09.12
Kafka Pub CircuitBreaker 적용하기  (0) 2024.09.07
Kafka Pub 동적 Batch 관리  (0) 2024.08.18