본문으로 바로가기

Spring Boot Logstash 연동 설정 및 작동 방식

category n년차 개발자 2025. 4. 16. 07:39
반응형


Spring Boot 애플리케이션의 로그를 Logstash로 전달해 중앙에서 수집하고 Kibana로 시각화하려면, 먼저 Logstash와의 연결 설정이 필요합니다.

이 글에서는 Spring Boot 애플리케이션에서 Logstash로 로그를 보내기 위한 설정과, 해당 설정만으로 로그가 어떻게 전달되고 처리되는지를 확인하기 위해 작동 방식도 함께 정리합니다.

 

 

설정방법

 

1. 의존성 주입

build.gradle.kts 또는 build.gradle 파일에 Logstash 인코더 라이브러리를 추가합니다.

implementation("net.logstash.logback:logstash-logback-encoder:7.4")
 

2. 로그 작성

class UserService(private val userRepository: UserRepository) {
  private val logger = LoggerFactory.getLogger(UserService::class.java)

  fun createUser(request: CreateUserRequest): UserResponse {
    val user = userRepository.save(request.toEntity())
    logger.info("Create user ${'$'}{user.id}")
    return user.toResponse()
  }

  fun getUsersAll(): List<UserResponse> {
    val users = userRepository.findAll()
    logger.info("All users ${'$'}{users.size}")
    return users.map(User::toResponse)
  }
}
 

3. logback 설정 (logback-spring.xml)

<configuration>
  <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
  <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>

  <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
    <destination>localhost:5001</destination>
    <reconnectionDelay>5000</reconnectionDelay>
    <writeTimeout>3000</writeTimeout>
    <keepAliveDuration>60000</keepAliveDuration>
    <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
  </appender>

  <root level="INFO">
    <appender-ref ref="CONSOLE"/>
    <appender-ref ref="LOGSTASH"/>
  </root>
</configuration>
 
간단하게 Kibana, Grafana로시각화해보았습니다.

 

 

작동 방식

 

logback-spring.xml에서 설정한 LogstashTcpSocketAppender는 내부적으로 로그 이벤트를 비동기로 처리하고, TCP를 통해 Logstash로 전송합니다. 전체 동작 흐름은 다음과 같습니다

 

1. 설정 초기화

  • Spring Boot 애플리케이션이 시작되면 logback-spring.xml 설정이 로드됩니다.
  • LogstashTcpSocketAppender는 AbstractLogstashTcpSocketAppender를 상속하여 구현되어 있으며, 이 클래스는 다시 AsyncDisruptorAppender를 상속합니다.
public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredProcessingAware, Listener extends TcpAppenderListener<Event>> extends AsyncDisruptorAppender<Event, Listener> {
 

 

2. Appender 초기화 과정

  • AbstractLogstashTcpSocketAppender.start() 메서드가 호출되면 인코더가 초기화되고, 내부 실행 스레드 풀이 구성됩니다.
  • 이후 상위 클래스인 AsyncDisruptorAppender.start()가 호출됩니다.
public synchronized void start() {
  if (!this.isStarted()) {
    int errorCount = 0;
    if (this.encoder == null) {
      ++errorCount;
      this.addError("No encoder was configured. Use <encoder> to specify the fully qualified class name of the encoder to use");
    }

    if (this.destinations.isEmpty()) {
      ++errorCount;
      this.addError("No destination was configured. Use <destination> to add one or more destinations to the appender");
    }

    if (errorCount == 0 && this.socketFactory == null) {
      if (this.sslConfiguration == null) {
        this.socketFactory = SocketFactory.getDefault();
      } else {
        try {
          SSLContext sslContext = this.getSsl().createContext(this);
          SSLParametersConfiguration parameters = this.getSsl().getParameters();
          parameters.setContext(this.getContext());
          this.socketFactory = new UnconnectedConfigurableSSLSocketFactory(parameters, sslContext.getSocketFactory());
        } catch (Exception var4) {
          Exception e = var4;
          this.addError("Unable to create ssl context", e);
          ++errorCount;
        }
      }
    }

    if (this.keepAliveMessage != null) {
      this.keepAliveBytes = this.keepAliveMessage.getBytes(this.keepAliveCharset);
    }

    if (errorCount == 0) {
      this.encoder.setContext(this.getContext());
      if (!this.encoder.isStarted()) {
        this.encoder.start();
      }

      int threadPoolCoreSize = 1;
      if (this.isKeepAliveEnabled()) {
        ++threadPoolCoreSize;
      }

      if (this.isWriteTimeoutEnabled()) {
        ++threadPoolCoreSize;
      }

      this.executorService = new ScheduledThreadPoolExecutor(threadPoolCoreSize, this.getThreadFactory());
      this.executorService.setRemoveOnCancelPolicy(true);
      this.shutdownLatch = new CountDownLatch(1);
      super.start();
    }
  }
}
 

 

 

3. Disruptor 구조 및 이벤트 처리

  • AsyncDisruptorAppender는 LMAX Disruptor 라이브러리를 사용하여 고성능 비동기 링 버퍼(Ring Buffer) 를 생성합니다.
  • Disruptor에 TcpSendingEventHandler가 이벤트 핸들러로 등록됩니다.
public void start() {
  if (this.addDefaultStatusListener && this.getStatusManager() != null && this.getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
    LevelFilteringStatusListener statusListener = new LevelFilteringStatusListener();
    statusListener.setLevelValue(1);
    statusListener.setDelegate(new OnConsoleStatusListener());
    statusListener.setContext(this.getContext());
    statusListener.start();
    this.getStatusManager().add(statusListener);
  }

  this.disruptor = new Disruptor(this.eventFactory, this.ringBufferSize, this.threadFactory, this.producerType, this.waitStrategy);
  this.disruptor.setDefaultExceptionHandler(this.exceptionHandler);
  this.disruptor.handleEventsWith(new EventHandler[]{new EventClearingEventHandler(this.createEventHandler())});
  this.disruptor.start();
  super.start();
  this.fireAppenderStarted();
}
 

 

 

4. 로그 발생 시 처리 흐름

  1. 로그가 발생하면 logback이 로그를 이벤트로 변환합니다.
  2. 이벤트는 Disruptor의 링 버퍼에 발행됩니다.
  3. TcpSendingEventHandler는 링 버퍼에서 이벤트를 소비하여 처리합니다.
  4. onEvent() 메서드가 호출되며, 내부적으로 writeEvent() 메서드를 통해 Logstash로 TCP 전송됩니다.
TcpSendingEventHandler
public void onEvent(AsyncDisruptorAppender.LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {
  while(true) {
    Socket socket = this.socket;
    OutputStream outputStream = this.outputStream;
    if (socket == null && (!AbstractLogstashTcpSocketAppender.this.isStarted() || Thread.currentThread().isInterrupted())) {
      AbstractLogstashTcpSocketAppender.this.fireEventSendFailure((DeferredProcessingAware)logEvent.event, AbstractLogstashTcpSocketAppender.SHUTDOWN_IN_PROGRESS_EXCEPTION);
      break;
    }

    if (socket == null) {
      this.reopenSocket();
    } else {
      Future<?> readerFuture = this.readerFuture;
      if (readerFuture.isDone()) {
        String msg = "destination terminated the connection";

        try {
          readerFuture.get();
        } catch (ExecutionException var10) {
          ExecutionException exx = var10;
          msg = msg + " (cause: " + exx.getCause().getMessage() + ")";
        }

        AbstractLogstashTcpSocketAppender.this.addInfo(AbstractLogstashTcpSocketAppender.this.peerId + msg + ". Reconnecting.");
        this.reopenSocket();
      } else {
        try {
          this.writeEvent(socket, outputStream, logEvent, endOfBatch);
          return;
        } catch (EncoderException var11) {
          EncoderException e = var11;
          AbstractLogstashTcpSocketAppender.this.addWarn(AbstractLogstashTcpSocketAppender.this.peerId + "Encoder failed to encode event. Dropping event.", e.getCause());
          AbstractLogstashTcpSocketAppender.this.fireEventSendFailure((DeferredProcessingAware)logEvent.event, e.getCause());
          break;
        } catch (Exception var12) {
          Exception ex = var12;
          AbstractLogstashTcpSocketAppender.this.addWarn(AbstractLogstashTcpSocketAppender.this.peerId + "Unable to send event. Reconnecting.", ex);
          this.reopenSocket();
        }
      }
    }
  }
}

private void writeEvent(Socket socket, OutputStream outputStream, AsyncDisruptorAppender.LogEvent<Event> logEvent, boolean endOfBatch) throws IOException, EncoderException {
  long startWallTime = System.currentTimeMillis();
  long startNanoTime = System.nanoTime();
  this.lastSendStartNanoTime = startNanoTime;
  if (logEvent.event != null) {
    this.encode((DeferredProcessingAware)logEvent.event, outputStream);
  } else if (this.hasKeepAliveDurationElapsed(this.lastSendEndNanoTime, startNanoTime)) {
    outputStream.write(AbstractLogstashTcpSocketAppender.this.keepAliveBytes);
  }

  if (endOfBatch) {
    outputStream.flush();
  }

  long endNanoTime = System.nanoTime();
  this.lastSendEndNanoTime = endNanoTime;
  if (logEvent.event != null) {
    AbstractLogstashTcpSocketAppender.this.fireEventSent(socket, (DeferredProcessingAware)logEvent.event, endNanoTime - startNanoTime);
  }

  if (AbstractLogstashTcpSocketAppender.this.connectionStrategy.shouldReconnect(startWallTime, AbstractLogstashTcpSocketAppender.this.connectedDestinationIndex, AbstractLogstashTcpSocketAppender.this.destinations.size())) {
    AbstractLogstashTcpSocketAppender.this.addInfo(AbstractLogstashTcpSocketAppender.this.peerId + "reestablishing connection.");
    outputStream.flush();
    this.reopenSocket();
  }
}
 
반응형