본문으로 바로가기

Kafka Pub CircuitBreaker 적용하기

category MQ 2024. 9. 7. 14:31
반응형


Pub, Sub, Stream, Kafka와 같이 분산 시스템을 구축하면서 각 서비스에서 다른 서비스를 호출할 때 실패하거나 지연이 되어 해당 영향도가 다른 서비스에 확산되지 않도록 하기 위해 서킷 브레이커를 학습하고 적용하였습니다.

 

이벤트 요청이 누락되면 안되기 때문에 Kafka Pub 실패 시 아래 AS-IS 플로우처럼 Pub 애플리케이션에서 지속적으로 재시도 호출을 하고 있습니다. Kafka 장애가 복구되기 전까지 같은 오류가 지속적으로 발생하여 Pub 애플리케이션에도 문제가 생길 수 있고 각 외부 서비스에 장애가 전파되기 때문에 TO-BE와 같이 서킷 브레이커를 적용해 봤습니다.

 

AS-IS

사용자 → Pub 애플리케이션 → Kafka 장애 → 호출 실패 → 오류 → retry …. → Kafka 복구 → 정상

 

TO-BE

사용자 → Pub 애플리케이션 → Kafka 장애 → 호출 실패 → 서킷 (Open, Half Open, Close) → Kafka 복구 → 정상

 

 

Circuit Break 서킷 브레이커

개념

  • 서킷브레이커는 전기 회로에서 과전류가 흐를 때 회로를 차단하여 더 큰 문제를 방지하는 장치.
  • 외부 시스템 호출 시 발생할 수 있는 장애를 탐지하고 자동으로 차단하여 시스템 전체에 미치는 영향을 최소화하는 패턴.

장점

  • 장애 전파 방지
    • 특정 서비스에 장애가 발생했을 때, 이 장애가 다른 서비스로 전파되는 것을 방지.
  • 시스템 안정성 향상
    • 시스템 전체의 안정성을 확보하고, 서비스 간 의존성을 줄여줌.
  • 빠른 복구
    • 장애가 해결된 후에는 자동으로 서비스를 복구하여 시스템의 가용성을 높임.

서킷 브레이커 상태

  • CLOSED
    • 닫힌 상태 (정상)
    • 모든 요청이 정상적으로 외부 서비스로 전달함.
  • OPEN
    • 오류 발생 횟수, 지연 등이 일정 임계치에 도달하면 OPEN 상태가 되며, 모든 요청을 실패로 처리함.
    • 외부 서비스로의 호출도 중단함.
  • HALF_OPEN
    • OPEN에서 일정 시간이 경과하면 HALF_OPEN 상태가 됨.
    • 제한된 횟수의 요청을 보내서 외부 서비스의 상태를 확인하여 성공하면 CLOSED 상태로 돌아가고, 실패하면 다시 OPEN 상태로 돌아감.

 

Resilience4j 라이브러리 서킷 브레이커 사용

 

- 라이브러리 의존성 추가 후

- 적용하고자 하는 메서드에 @CircuitBreaker 애노테이션을 추가.

import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;

@Service
public class Test {
    
    @CircuitBreaker(name = "sendMessage", fallbackMethod = "fallback")
    public String sendMessage() {
        // 외부 서비스 호출 로직
    }

    public String fallback(Throwable t) {
        // 실패 처리
    }
}

 

application.yml에 서킷 브레이커 설정을 추가

  1. sliding-window-type
    • 설명: 서킷 브레이커가 호출을 계산하는 방식
    • 옵션 값:
      • count_based: 호출의 개수를 기준으로 슬라이딩 윈도를 설정
      • time_based: 호출이 발생한 시간을 기준으로 슬라이딩 윈도우를 설정
  2. failure-rate-threshold
    • 설명: 서킷 브레이커가 열리기 위한 실패 비율의 임계값(%).
  3. minimum-number-of-calls
    • 설명: 서킷 브레이커가 열리기 전에 수집해야 하는 최소 호출 수.
  4. slow-call-rate-threshold
    • 설명: 느린 호출의 비율이 이 임계값(%)을 초과하면 서킷 브레이커가 Open.
  5. slow-call-duration-threshold
    • 설명: 호출이 느리다고 간주되는 임계 시간(밀리초).
  6. permitted-number-of-calls-in-half-open-state
    • 설명: 서킷 브레이커가 Half-Open 상태에서 허용할 최대 호출 수.
  7. max-wait-duration-in-half-open-state
    • 설명: 서킷 브레이커가 Half-Open 상태로 유지될 최대 시간.
  8. wait-duration-in-open-state
    • 설명: 서킷 브레이커가 Open 상태에서 Half-Open 상태로 전환될 때까지 기다리는 시간.
  9. automatic-transition-from-open-to-half-open-enabled
    • 설명: 서킷 브레이커가 열림 상태에서 자동으로 Half-Open 상태로 전환될지 여부를 설정.
  10. record-exceptions
    • 설명: 서킷 브레이커가 실패로 기록할 예외 유형을 정의.
resilience4j.circuitbreaker:
  instances:
    sendMessage: //서킷 브레이커 인스턴스 이름.
      baseConfig: default //default 기본 구성

  configs:
    default: //default 구성
      #슬라이딩 윈도우 방식 집계
      #개수로 체크 count_base, 시간으로 체크 time_base
      sliding-window-type: count_based
      sliding-window-size: 10

      #fail
      failure-rate-threshold: 10
      minimum-number-of-calls: 5

      #delay
      slow-call-rate-threshold: 10
      slow-call-duration-threshold: 3000ms

      #half open 상태에서 다른 상태로 전환하기 위한 판단 수
      permitted-number-of-calls-in-half-open-state: 10
      #half open 상태 유지 시간
      max-wait-duration-in-half-open-state: 5000ms

      #open 상태에서 half open으로 전환까지 기다리는 시간
      wait-duration-in-open-state: 6000ms
      #open 상태에서 half open 으로 자동 전환 (true시 일정 시간이 지난 후 자동 전환)
      automatic-transition-from-open-to-half-open-enabled: true

      #실패 예외
      record-exceptions:
        - java.io.IOException

 

적용 방안

위와 같이 Resilience4j 라이브러리를 사용하여 적용하려 했으나, 현재 구조에서 사용하기에는 복잡한 구조가 있어 서킷 브레이커 개념을 가진 클래스를 구현하여 적용했습니다.

  • CircuitBreakerCheck
@Component
public class CircuitBreakerCheck {
    private static final int FAIL_ALLOW_COUNT = 5; // 실패 허용
    private static final long RESET_OPEN_MS = 30000; // 서킷 오픈 후, 닫하기 전 대기 시간
    private final AtomicInteger failCount = new AtomicInteger(0); // 실패 횟수
    private final AtomicBoolean isOpen = new AtomicBoolean(false); // 서킷 오픈 여부
    private long lastFailTime;

    public boolean allowRequest() {
        if (isOpen.get()) {
            if (System.currentTimeMillis() - lastFailTime > RESET_OPEN_MS) {
                isOpen.set(false);
                failCount.set(0);
                return true;
            }
            return false;
        }
        return true;
    }

    public void recordSuccess() {
        failCount.set(0);
        isOpen.set(false);
    }

    public void recordFailure() {
        lastFailTime = System.currentTimeMillis();
        if (failCount.incrementAndGet() >= FAIL_ALLOW_COUNT) {
            isOpen.set(true);
        }
    }
}

 

Kafka Pub 로직

 

private void processQueue() {
	log.info("{}", Thread.currentThread().getName());

	while (!Thread.currentThread().isInterrupted()) {
		try {
			// 서킷 오픈 체크
			if (!circuitBreakerCheck.allowRequest()) {
				log.warn("Circuit Breaker Open");
				Thread.sleep(1000);
				continue;
			}

			// messageQueue 비어있으면 스레드 대기
			KafkaMessageDto kafkaMessageDto = messageQueue.take();

			log.info("{}, {}, {}", Thread.currentThread().getName(), kafkaMessageDto.toString(), messageQueue.size());

			if (kafkaMessageDto.getTopic() != null && kafkaMessageDto.getMessage() != null) {
				kafkaTemplate.send(kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage())
					.addCallback(
						result -> {
							circuitBreakerCheck.recordSuccess();
						},
						ex -> {
							log.error("Fail Message {}, {}: {}", kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage(), ex.getMessage());
							circuitBreakerCheck.recordFailure();
							sendMessage(kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage());
						}
					);
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		} catch (Exception e) {
			log.error(e.getMessage());
			circuitBreakerCheck.recordFailure();
		}
	}
}
반응형

'MQ' 카테고리의 다른 글

Kafka 공통 메시지 포맷을 위한 제네릭 적용하기  (0) 2024.09.12
Kafka Pub 동적 Batch 관리  (0) 2024.08.18
Kafka Pub 호출 방안  (0) 2024.08.18
Kafka 학습하기_컨슈머  (0) 2024.06.05
Kafka 학습하기_프로듀서  (0) 2024.06.04