Pub, Sub, Stream, Kafka와 같이 분산 시스템을 구축하면서 각 서비스에서 다른 서비스를 호출할 때 실패하거나 지연이 되어 해당 영향도가 다른 서비스에 확산되지 않도록 하기 위해 서킷 브레이커를 학습하고 적용하였습니다.
이벤트 요청이 누락되면 안되기 때문에 Kafka Pub 실패 시 아래 AS-IS 플로우처럼 Pub 애플리케이션에서 지속적으로 재시도 호출을 하고 있습니다. Kafka 장애가 복구되기 전까지 같은 오류가 지속적으로 발생하여 Pub 애플리케이션에도 문제가 생길 수 있고 각 외부 서비스에 장애가 전파되기 때문에 TO-BE와 같이 서킷 브레이커를 적용해 봤습니다.
AS-IS
사용자 → Pub 애플리케이션 → Kafka 장애 → 호출 실패 → 오류 → retry …. → Kafka 복구 → 정상
TO-BE
사용자 → Pub 애플리케이션 → Kafka 장애 → 호출 실패 → 서킷 (Open, Half Open, Close) → Kafka 복구 → 정상
Circuit Break 서킷 브레이커
개념
- 서킷브레이커는 전기 회로에서 과전류가 흐를 때 회로를 차단하여 더 큰 문제를 방지하는 장치.
- 외부 시스템 호출 시 발생할 수 있는 장애를 탐지하고 자동으로 차단하여 시스템 전체에 미치는 영향을 최소화하는 패턴.
장점
- 장애 전파 방지
- 특정 서비스에 장애가 발생했을 때, 이 장애가 다른 서비스로 전파되는 것을 방지.
- 시스템 안정성 향상
- 시스템 전체의 안정성을 확보하고, 서비스 간 의존성을 줄여줌.
- 빠른 복구
- 장애가 해결된 후에는 자동으로 서비스를 복구하여 시스템의 가용성을 높임.
서킷 브레이커 상태
- CLOSED
- 닫힌 상태 (정상)
- 모든 요청이 정상적으로 외부 서비스로 전달함.
- OPEN
- 오류 발생 횟수, 지연 등이 일정 임계치에 도달하면 OPEN 상태가 되며, 모든 요청을 실패로 처리함.
- 외부 서비스로의 호출도 중단함.
- HALF_OPEN
- OPEN에서 일정 시간이 경과하면 HALF_OPEN 상태가 됨.
- 제한된 횟수의 요청을 보내서 외부 서비스의 상태를 확인하여 성공하면 CLOSED 상태로 돌아가고, 실패하면 다시 OPEN 상태로 돌아감.
Resilience4j 라이브러리 서킷 브레이커 사용
- 라이브러리 의존성 추가 후
- 적용하고자 하는 메서드에 @CircuitBreaker 애노테이션을 추가.
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
@Service
public class Test {
@CircuitBreaker(name = "sendMessage", fallbackMethod = "fallback")
public String sendMessage() {
// 외부 서비스 호출 로직
}
public String fallback(Throwable t) {
// 실패 처리
}
}
application.yml에 서킷 브레이커 설정을 추가
- sliding-window-type
- 설명: 서킷 브레이커가 호출을 계산하는 방식
- 옵션 값:
- count_based: 호출의 개수를 기준으로 슬라이딩 윈도를 설정
- time_based: 호출이 발생한 시간을 기준으로 슬라이딩 윈도우를 설정
- failure-rate-threshold
- 설명: 서킷 브레이커가 열리기 위한 실패 비율의 임계값(%).
- minimum-number-of-calls
- 설명: 서킷 브레이커가 열리기 전에 수집해야 하는 최소 호출 수.
- slow-call-rate-threshold
- 설명: 느린 호출의 비율이 이 임계값(%)을 초과하면 서킷 브레이커가 Open.
- slow-call-duration-threshold
- 설명: 호출이 느리다고 간주되는 임계 시간(밀리초).
- permitted-number-of-calls-in-half-open-state
- 설명: 서킷 브레이커가 Half-Open 상태에서 허용할 최대 호출 수.
- max-wait-duration-in-half-open-state
- 설명: 서킷 브레이커가 Half-Open 상태로 유지될 최대 시간.
- wait-duration-in-open-state
- 설명: 서킷 브레이커가 Open 상태에서 Half-Open 상태로 전환될 때까지 기다리는 시간.
- automatic-transition-from-open-to-half-open-enabled
- 설명: 서킷 브레이커가 열림 상태에서 자동으로 Half-Open 상태로 전환될지 여부를 설정.
- record-exceptions
- 설명: 서킷 브레이커가 실패로 기록할 예외 유형을 정의.
resilience4j.circuitbreaker:
instances:
sendMessage: //서킷 브레이커 인스턴스 이름.
baseConfig: default //default 기본 구성
configs:
default: //default 구성
#슬라이딩 윈도우 방식 집계
#개수로 체크 count_base, 시간으로 체크 time_base
sliding-window-type: count_based
sliding-window-size: 10
#fail
failure-rate-threshold: 10
minimum-number-of-calls: 5
#delay
slow-call-rate-threshold: 10
slow-call-duration-threshold: 3000ms
#half open 상태에서 다른 상태로 전환하기 위한 판단 수
permitted-number-of-calls-in-half-open-state: 10
#half open 상태 유지 시간
max-wait-duration-in-half-open-state: 5000ms
#open 상태에서 half open으로 전환까지 기다리는 시간
wait-duration-in-open-state: 6000ms
#open 상태에서 half open 으로 자동 전환 (true시 일정 시간이 지난 후 자동 전환)
automatic-transition-from-open-to-half-open-enabled: true
#실패 예외
record-exceptions:
- java.io.IOException
적용 방안
위와 같이 Resilience4j 라이브러리를 사용하여 적용하려 했으나, 현재 구조에서 사용하기에는 복잡한 구조가 있어 서킷 브레이커 개념을 가진 클래스를 구현하여 적용했습니다.
- CircuitBreakerCheck
@Component
public class CircuitBreakerCheck {
private static final int FAIL_ALLOW_COUNT = 5; // 실패 허용
private static final long RESET_OPEN_MS = 30000; // 서킷 오픈 후, 닫하기 전 대기 시간
private final AtomicInteger failCount = new AtomicInteger(0); // 실패 횟수
private final AtomicBoolean isOpen = new AtomicBoolean(false); // 서킷 오픈 여부
private long lastFailTime;
public boolean allowRequest() {
if (isOpen.get()) {
if (System.currentTimeMillis() - lastFailTime > RESET_OPEN_MS) {
isOpen.set(false);
failCount.set(0);
return true;
}
return false;
}
return true;
}
public void recordSuccess() {
failCount.set(0);
isOpen.set(false);
}
public void recordFailure() {
lastFailTime = System.currentTimeMillis();
if (failCount.incrementAndGet() >= FAIL_ALLOW_COUNT) {
isOpen.set(true);
}
}
}
Kafka Pub 로직
private void processQueue() {
log.info("{}", Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
try {
// 서킷 오픈 체크
if (!circuitBreakerCheck.allowRequest()) {
log.warn("Circuit Breaker Open");
Thread.sleep(1000);
continue;
}
// messageQueue 비어있으면 스레드 대기
KafkaMessageDto kafkaMessageDto = messageQueue.take();
log.info("{}, {}, {}", Thread.currentThread().getName(), kafkaMessageDto.toString(), messageQueue.size());
if (kafkaMessageDto.getTopic() != null && kafkaMessageDto.getMessage() != null) {
kafkaTemplate.send(kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage())
.addCallback(
result -> {
circuitBreakerCheck.recordSuccess();
},
ex -> {
log.error("Fail Message {}, {}: {}", kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage(), ex.getMessage());
circuitBreakerCheck.recordFailure();
sendMessage(kafkaMessageDto.getTopic(), kafkaMessageDto.getMessage());
}
);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(e.getMessage());
circuitBreakerCheck.recordFailure();
}
}
}
'MQ' 카테고리의 다른 글
MSA 환경 트랜잭션 (1) | 2024.11.28 |
---|---|
Kafka 공통 메시지 포맷을 위한 제네릭 적용하기 (0) | 2024.09.12 |
Kafka Pub 동적 Batch 관리 (0) | 2024.08.18 |
Kafka Pub 호출 방안 (0) | 2024.08.18 |
Kafka 학습하기_컨슈머 (0) | 2024.06.05 |