본문으로 바로가기

Kafka Pub 호출 방안

category MQ 2024. 8. 18. 13:31
반응형


현재 사내에서 상품 정보를 변경 시에 처리하는 로직은 세 가지 방식이 있습니다.
1. 변경이 일어나 상품정보에 수정이 필요한 데이터들을 수정 처리.
2. 스케줄러를 통해 변경된 데이터를 주기적으로 감지 후 처리.

3. 전체 색인, 데이터 전체를 동기화 시켜줌.

 

위 처리방식의 문제점을 해결하기 위해 카프카를 도입하고 아래와 같이 Pub, Sub, Stream 3개의 프로젝트를 구성하려 합니다.

 

현재 사내에서는 이벤트가 발생하는곳은 업체에 제공해주는 내부/외부 관리자 페이지, 배치 프로세스에서 발생하고 있습니다.

 

위 2가지에서 발생하는 이벤트를 Kafka에 Pub 해주는 방법은 2가지가 있었습니다.

1. 관리자 페이지, 배치 프로세스에서 직접 Kafka에 Pub 하기

2, 별도 애플리케이션을 통한 Kafka에 Pub 하기

 

관리자 페이지에서 직접 Pub하는것 보다 아래 요건을 위해 Pub 애플리케이션을 만들기로 작업하였습니다.

- Kafka에서 발생하는 지연이나 장애, 에러를 사용자가 인지하지 않도록 처리하고 해당 문제를 Pub 애플리케이션 자체에서 처리.

- Pub 메세지 통합 관리

 

위 요건으로 개발하는 과정에서 아래와 같은 이슈가 발생했습니다.

 

처음 생각한 방식은 아래와 같았습니다.

관리자 페이지 → Pub 애플리케이션 → Kafka
Java Kafka API를 사용하여 비동기적으로 메시지를 전송하고, 콜백을 통해 실패 이벤트를 관리하려 했으나 아래와 같은 이슈가 있었습니다.

테스트 시나리오
1. Pub 애플리케이션 실행, Kafka 실행 → Kafka 종료
2. Pub 애플리케이션 실행, Kafka 종료

 

결과
- 1번 테스트 → 즉시 결과를 사용자에게 반환하고 비동기로 콜백을 호출함.

- 2번 테스트 → 결과 반환이 지연되고, Producer 설정 “MAX_BLOCK_MS_CONFIG”에 따라 Default 값 60초를 대기하고 결과 값 반환.

  • 원인(추측): Kafka 통신하기 전 Metadata를 받아오는데 해당 데이터는 동기 방식으로 호출하기 때문.

에러 내용

Exception thrown when sending a message with key='null' and payload='{
    "message": "1"
}' to topic exam-topc1:
org.apache.kafka.common.errors.TimeoutException: Topic exam-topc1 not present in metadata after 60000 ms.

 

해결 방법

1. “MAX_BLOCK_MS_CONFIG” 해당 값을 줄이면 대기 시간 단축 가능.

  • 문제는 Producer 애플리케이션에서 Kafka에 Pub이 되었음에도 실패로 판단할 수 있으므로 시간 단축은 검토가 필요함.

2. 메모리 큐 사용.   
    1. 사용자 (관리자, BATCH) → Pub 애플리케이션 → 메모리 큐
    2. 메모리 큐 → Kafka   

 

메모리 큐 사용시에 위와 같이 두 단계로 처리
    - 사용자 이벤트를 메모리 큐에 저장하고 즉시 정상 응답 반환
    - 메모리 큐를 Kafka에 동기적으로 호출해 예외에 대한 처리 진행

 

 

BlockingQueue 설명

특징
    - Java의 스레드 안전 큐 구조, 멀티스레드 환경에서 안전하게 사용 가능.
    - 큐가 가득 차거나 비어있을 경우 스레드를 블로킹.
    - null 값은 허용하지 않음.

구현체 특징
    - LinkedBlockingQueue: 링크드 리스트 기반, FIFO 방식, 크기 제한 가능.
    - ArrayBlockingQueue: 고정 크기 배열 기반, FIFO 방식.
    - PriorityBlockingQueue: 우선 순위에 따라 요소 처리.
    - SynchronousQueue: 내부 저장 공간 없음, 생산자와 소비자가 즉시 연결.

주요 메서드
    - 추가:
        - `add()`: 요소 추가, 공간 부족 시 예외 발생.
        - `put()`: 요소 추가, 큐가 가득 찬 경우 블로킹.
        - `offer()`: 요소 추가, 즉시 불가 시 false 반환.
    - 제거:
        - `take()`: 헤드에서 요소 제거 및 반환, 큐가 비어있으면 블로킹.

 

 

적용 방안

public class KafkaProducerService {
    //messageQueue에 메모리가 허용하는만큼 넣을 수 있지만 메모리 용량을 보고 thread 카운트를 수정필요.
    @Value("${spring.kafka.producer.thread-cnt}")
    private int threadCount;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final BlockingQueue<KafkaMessageDto> messageQueue = new LinkedBlockingQueue<>();
    private ExecutorService executorService;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostConstruct
    public void init() {
        this.executorService = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++) {
            startProcessingThread();
        }
    }

    private void startProcessingThread() {
        executorService.submit(this::processQueue);
    }

    @PreDestroy
    public void shutdown() {
        try {
            // 정상적인 종료시점에서 메모리에 올려놓은거 완료 처리 기다려야함..
            while (!messageQueue.isEmpty()) {
                log.info("messageQueue {}", messageQueue.size());
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }

        executorService.shutdown();
    }

    private void processQueue() {
        log.info("{}", Thread.currentThread().getName());

        while (!Thread.currentThread().isInterrupted()) {
            try {
                // messageQueue 비어있으면 스레드 대기
                KafkaMessageDto kafkaMessageDto = messageQueue.take();

                log.info("{}, {}, {}", Thread.currentThread().getName(), kafkaMessageDto.toString(), messageQueue.size());

                if (kafkaMessageDto.getTopic() != null && kafkaMessageDto.getMessage() != null) {
                    //TODO 해당 부분에만 서킷 설정 예외가 발생하면 계속 넣어주지만 재시도 설정 필요.
                    kafkaTemplate.send(kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage())
                        .addCallback(
                            result -> {},
                            ex -> {
                                log.error("Fail Message {}, {}: {}", kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage(), ex.getMessage());
                                sendMessage(kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage());
                            }
                        );
                }

                //messageQueue.put(new KafkaMessageDto("exam-topic1", "test"));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                log.error(e.getMessage());
            }
        }
    }

    public void sendMessage(String topic, String message) {
        KafkaMessageDto kafkaMessageDto = new KafkaMessageDto();
        kafkaMessageDto.setTopic(topic);
        kafkaMessageDto.setMessage(message);
        // add 큐가 가득 찬 경우 IllegalStateException 예외 발생
        // offer 큐가 가득 찬 경우 false 반환
        // 이벤트 요청량에 따라 수정필요.
        messageQueue.add(kafkaMessageDto);
    }
}
반응형

'MQ' 카테고리의 다른 글

Kafka Pub CircuitBreaker 적용하기  (0) 2024.09.07
Kafka Pub 동적 Batch 관리  (0) 2024.08.18
Kafka 학습하기_컨슈머  (0) 2024.06.05
Kafka 학습하기_프로듀서  (0) 2024.06.04
Kafka 학습하기_토픽, 파티션, 레코드  (0) 2024.06.02