본문으로 바로가기

Kafka Pub 동적 Batch 관리

category MQ 2024. 8. 18. 14:55
반응형


현재 사내에서 상품 정보를 변경 시에 처리하는 로직은 세 가지 방식이 있습니다.
1. 변경이 일어나 상품정보에 수정이 필요한 데이터들을 수정 처리.
2. 스케줄러를 통해 변경된 데이터를 주기적으로 감지 후 처리.

3. 전체 색인, 데이터 전체를 동기화 시켜줌.

 

위 처리방식의 문제점을 해결하기 위해 카프카를 도입하고 3가지 방식을 없애거나 줄이는 방법을 생각했습니다.

하지만 3번 방식으로 처리했던 코드를 분석시에 2번과 같은 방법이 여전히 필요했습니다.

 

예를 들어 아래 전시 상품 프로시저는 3번 방식으로 처리하는 로직이였습니다. (전체 색인, 데이터 전체를 동기화 시켜줌)

분석을 해보면 이벤트로 처리 하지 못하는 부분이 생겼습니다.

1. 전시 상품 프로시저 분석

  • 판매 종료 대상 업체 → 이벤트
  • 전시 여부, 판매 상태 → 이벤트
  • 예약 상품 → 배치
  • .....

예약 상품, 판매 예약 시작일/종료일 등의 요건은 2번 방식으로 데이터를 주기적으로 감지하고 처리해야합니다.

 


 

아래와 같은 방법 처리를 Kafka를 통해 변경할 예정입니다.

 

 

AS-IS

변경 감지 후 변경분 적용

 

TO-BE (장점)

변경 감지 → Pub 애플리케이션 → Kafka → Sub 애플리케이션

  • 변경 감지 배치 시간 간격 감소
  • 애플리케이션 간 결합도 감소
  • 데이터 재처리 용이

 

TO-BE 방식에서 변경 감지를 아래와 같은 정적 방법으로 개발을 하였고 한계가 있었고 동적 배치로 변경하였습니다.

@Scheduled(cron = "0 */2 * * * * ")
public void sellPeriod() {
    .....
}

 

적 배치 관리 한계

  • 새로운 배치 작업 추가 시 애플리케이션 재배포 필요
  • 실행 중인 배치 작업의 스케줄 변경이 어려움
  • 배치 작업의 동적인 활성화/비활성화가 불가능

 

 

동적 배치 관리 장점

  1. 유연성: 런타임에 배치 작업을 추가하거나 제거할 수 있습니다.
  2. 확장성: 새로운 유형의 배치 프로세서를 쉽게 추가할 수 있습니다.
  3. 관리 용이성: 각 배치 작업의 상태를 개별적으로 관리할 수 있습니다.
  4. 리소스 효율성: ThreadPoolTaskScheduler를 통해 효율적인 스레드 관리가 가능합니다.

구현 예시

  • SchedulingConfigurer 인터페이스를 구현하여 커스텀 스케줄러 구성.
  • ConcurrentHashMap 멀티 스레드 환경에서 사용할 수 있도록 나온 클래스.
  • ThreadPoolTaskScheduler를 사용하여 멀티스레딩 지원
  • CronTask와 ScheduledFuture를 사용하여 각 배치 작업 관리
public class BatchManage implements SchedulingConfigurer {
    private final BatchService batchService;
    private final ApplicationContext applicationContext;
    private ThreadPoolTaskScheduler taskScheduler;
    private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
    private final Map<String, CronTask> tasksMap = new ConcurrentHashMap<>();

    public BatchManage(BatchService batchService, ApplicationContext applicationContext) {
        this.batchService = batchService;
        this.applicationContext = applicationContext;
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskScheduler = new ThreadPoolTaskScheduler();
        //taskScheduler.setPoolSize(5);
        //taskScheduler.setThreadNamePrefix("Batch-");
        taskScheduler.initialize();
        taskRegistrar.setScheduler(taskScheduler);

        List<MqBatchMDto> batchList = batchService.getBatchList();
        initTasks(batchList);
    }

    public void initTasks(List<MqBatchMDto> batchList) {
        for (MqBatchMDto batch : batchList) {
            if ("Y".equals(batch.getBatchYn())) {
                addTask(batch);
            }
        }
    }

    public void addTask(MqBatchMDto mqBatchMDto) {
        Runnable task = () -> {
            BatchProcessor processor = (BatchProcessor) applicationContext.getBean(mqBatchMDto.getProcessType());
            processor.process(mqBatchMDto);
        };

        CronTask cronTask = new CronTask(task, new CronTrigger(mqBatchMDto.getCronExpress()));
        tasksMap.put(mqBatchMDto.getBatchId(), cronTask);
        ScheduledFuture<?> future = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        scheduledTasks.put(mqBatchMDto.getBatchId(), future);
    }

    public void deleteTask(String batchId) {
        tasksMap.remove(batchId);
        ScheduledFuture<?> future = scheduledTasks.get(batchId);
        if (future != null) {
            // 스케줄 등록 삭제, 실행중인 스케줄 종료
            future.cancel(false);
        }
    }
}

 

 

 

반응형

'MQ' 카테고리의 다른 글

Kafka 공통 메시지 포맷을 위한 제네릭 적용하기  (0) 2024.09.12
Kafka Pub CircuitBreaker 적용하기  (0) 2024.09.07
Kafka Pub 호출 방안  (0) 2024.08.18
Kafka 학습하기_컨슈머  (0) 2024.06.05
Kafka 학습하기_프로듀서  (0) 2024.06.04