본문으로 바로가기

RabbitMQ Prefetch Count 란? (closing AMQP connection)

category MQ 2023. 2. 10. 15:17
반응형

 

운영 중인 RabbitMQ와 Consumer 쪽 모듈과 연결이 끊기는 문제가 발생하여 트러블 슈팅한 내용입니다.


어떤 원인으로 MQ와 연결이 끊기면서 모듈쪽 재 연결 프로세스를 통해 Consumer가 다시 생성되면 MQ 자체 복구 프로세스를 통해 수신 처리가 되지 않은 Unacked(Unacknowledgement) 메시지가 Ready 상태로 이동되어 처리된다.
위와 같은 내용이 반복되면서 Queue에 데이터가 쌓이고 지연이 발생함.

 

모니터링 도구 (그라파나)

위와 같이 문제가 발생하여 그래프에서 메시지 개수가 늘었다 줄었다 반복.

 

1. RabbitMQ 에러 로그

=ERROR REPORT==== 26-Jan-2023::04:06:26 ===
closing AMQP connection .....
{writer,send_failed,{error,timeout}}

 

2. Consumer 모듈 에러

2023-01-25 07:00:15.852 [WARN][ForgivingExceptionHandler.java-115] Consumer method handleDelivery for channel AMQChannel threw an exception for channel AMQChannel(Exception message: Connection reset)
2023-01-25 07:00:15.853 [ERROR][ForgivingExceptionHandler.java-119] Failure during close of channel AMQChannel after java.net.SocketException: Connection reset
java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        .....
2023-01-25 07:00:15.855 [ERROR][ForgivingExceptionHandler.java-119] An unexpected connection driver error occurred
java.io.IOException: Stream closed.
        at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:470)
        .....
2023-01-25 07:00:15.856 [ERROR][AbstractConnectionFactory.java-748] Shutdown Signal: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer (amq.ctag-M-3iDrBApDmk0HB-rnoJXg) method handleDelivery for channel AMQChannel

 

구글링 중 해당 내용을 확인 하였고 Prefetch Count를 확인.

https://stackoverflow.com/questions/35438843/rabbitmq-error-timeout

 

RabbitMQ error timeout

I've set up RabbitMQ in order to parse some 20.000 requests from an external API but it keeps timing out after a few minutes. It does get to correctly parse about 2000 out of the total 20.000 reque...

stackoverflow.com

 

Prefetch Count란 ?
Queue의 메시지를 Consumer의 메모리에 쌓아놓을 수 있는 최대 메시지의 양.

 

해당 큐 Consumer Prefetch Count

Prefetch Count 0 값은 값 무제한으로 처리.

A value of 0 is treated as infinite, allowing any number of unacknowledged messages.
https://www.rabbitmq.com/consumer-prefetch.html

 

 

 

Rabbit에 대기열이 있습니다. 해당 대기열에서 소비하는 일부 클라이언트가 있습니다. 
QoS 설정을 전혀 설정하지 않으면( basic.qos) Rabbit은 네트워크와 클라이언트가 허용하는 속도로 모든 대기열의 메시지를 클라이언트에 푸시합니다. 소비자는 자체 RAM에 모든 메시지를 버퍼링 하므로 메모리가 팽창합니다.

You have a queue in Rabbit. You have some clients consuming from that queue. 
If you don’t set a QoS setting at all (basic.qos), then Rabbit will push all the queue’s messages to the clients as fast as the network and the clients will allow. 
The consumers will balloon in memory as they buffer all the messages in their own RAM.

https://blog.rabbitmq.com/posts/2012/05/some-queuing-theory-throughput-latency-and-bandwidth

 

Some queuing theory: throughput, latency and bandwidth | RabbitMQ - Blog

Some queuing theory: throughput, latency and bandwidth Tweet Follow @RabbitMQ May 11, 2012 You have a queue in Rabbit. You have some clients consuming from that queue. If you don’t set a QoS setting at all (basic.qos), then Rabbit will push all the queue

blog.rabbitmq.com

 

예상 원인

모듈의 Consumer의 Prefetch Count는 0이므로 Prefetch Buf에 무제한으로 쌓아 올리고 Consume 버퍼가 가득 차서 작성할 수 없으면 RabbitMQ에서 해당 소켓을 차단하고 모듈과 연결을 닫음.

 

예상 원인이 맞는지 확인 하기 위해 로컬에서 시현

  1. 모듈에서 Channel Prefetch Count 0으로 설정
  2. RabbitMQ에 dump 메시지 쌓음
  3. 메시지 처리 로직 sleep으로 지연 주기
  4. 모듈 실행

 

Case1 - Prefetch Count 값이 0이므로 Buf에 무제한으로 쌓음

위와 같이 Consumer와 연결이 끊기고 MQ 오류 확인

closing AMQP connection <0.4179.0> ([::1]:38250 -> [::1]:5672 - rabbitConnectionFactory#191c68ec:8):
{writer,send_failed,{error,timeout}}

Unacked에서 Ready 상태로 변경

 

Case2 - Consumer 개수 증가

 

Consumer 개수를 5개로 증가
Prefetch Buf 마다 메시지를 분할적으로 받아서인지 끊기지는 않음.

 

실제 운영에 Consumer 개수를 증가시켜 적용해서 모니터링 진행.
빈도가 거의 발생하지 않지만 간헐적으로 발생.

 

Case3 - Prefetch Count 설정

 

Consume 개수는 1개, PreFetch Count를 250으로 설정 후에 실행
Case1번과 다르게 Ready 상태의 메시지를 전부 가져오지 않고 순차적으로 처리하여 MQ와 연결이 끊기지 않음.

 

 

JAVA Prefetch Count 설정 방법
https://www.rabbitmq.com/consumer-prefetch.html

 

Consumer Prefetch — RabbitMQ

Consumer Prefetch Consumer prefetch is an extension to the channel prefetch mechanism. AMQP 0-9-1 specifies the basic.qos method to make it possible to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch c

www.rabbitmq.com

 

운영에 Prefetch Count 적용.
더 이상 연결이 끊기지는 않지만 처리 속도가 느려진 느낌이 있어 모니터링 진행

 

적용 전

해소 시간 : 07:08 ~ 07:13
대기 메시지 수 : 15,798건
약 5분에 15,798건 처리

 

적용 후
해소 시간 : 07:16 ~ 07:23
대기 메시지 수 : 15,041건
약 7분에 15,041건 처리

 

Prefetch Count 최적화

https://velog.io/@sdb016/RabbitMQ-Prefetch%EC%99%80-%EC%84%B1%EB%8A%A5

https://www.cloudamqp.com/blog/how-to-optimize-the-rabbitmq-prefetch-count.html

 

FAQ: How to Optimize the RabbitMQ Prefetch Count - CloudAMQP

The RabbitMQ prefetch value is used to specify how many messages are being sent at the same time. Understanding how to optimize the RabbitMQ prefetch count maximizes the speed of the system.

www.cloudamqp.com

  • 메시지를 처리하는 처리 속도가 빠르고, 하나의 큐에 적은 Consumer가 있으면 high
  • 메시지를 처리하는 처리 속도가 빠르고, 하나의 큐에 많은 Consumer가 있으면 middle
  • 메시지를 처리하는 처리 속도가 느리면 1

메시지 처리 속도에 따라 Prefetch Count를 늘려서 최적화된 Count를 찾아야 할 것 같음.

 

 

참조 - https://medium.com/@joor.loohuis/about-the-prefetch-count-in-rabbitmq-5f2f5611063b

 

About the prefetch count in RabbitMQ…

Tweaking the RabbitMQ consumer prefetch count to suit your needs is a topic that deserves more than just casual attention.

medium.com

 

반응형

'MQ' 카테고리의 다른 글

Kafka 학습하기_토픽, 파티션, 레코드  (0) 2024.06.02
Kafka 학습하기_브로커와 클러스터  (0) 2024.06.01
Kafka 학습하기_장단점  (0) 2024.06.01
Rabbitmq 이해하기  (0) 2023.05.11
CentOS 6.x에 RabbitMQ 설치  (0) 2020.07.27