2023. 4. 17. 20:13ㆍMessage Queue/RabbitMQ
회사에서 RabbitMQ를 사용해서 프로젝트를 하고 있는데, 이슈가 있어서 해당 이슈를 분석하고 해결하는 방법을 정리하려고 합니다.
현재 상황
RabbitMQ가 있고 Replicas가 되어 있는 Worker가 메시지를 받게 되면 처리하고 다음 Queue로 메시지를 전송하는 로직입니다.
Worker는 메시지를 예약하고 처리 후 Ask로 메시지가 Queue에서 삭제하도록 되어 있습니다.
만약 Exception이 발생해도 Ask를 날리고 Qeueue에서 메시지를 삭제하고 다음 메시지를 받도록 되어 있습니다.
이슈
평소에는 메시지 1개당 처리 시간이 5분에서 10분 정도 였는데, 어느 날 1시간이 넘는 처리 시간을 가진 메시지가 들어와 이슈가 발생하였습니다.
Worker 내부적으로 30분이 넘으면 Timeout Exception을 발생시키고 해당 메시지를 삭제하고 다음 메시지를 받도록 만들었습니다.
그런데 이상하게 1시간이 넘는 메시지는 계속 Timeout이 발생하며 Worker01, Worker02, Worker03에서 떠돌고 있었습니다.
분석
먼저 결론부터 말씀드리면, consumer_timeout이 발생하였고 TTL을 지정하지 않아 메시지가 계속 Ready -> 할당 -> consumer_timeout -> Ready -> 할당 ... 을 반복하고 있었습니다.
1. 초기 상태
위와 같이 M1~M4까지 메시지가 있고 아래에 처리 되는 시간이 있습니다.
Worker들이 Queue에서 하나씩 메시지를 가져가서 처리하게 됩니다.
2. 10분 뒤
M2는 Worker02가 잘 처리했습니다. 그리고 Worker02는 M4 메시지를 할당 받아 처리하게 됩니다.
3. 20분 뒤
M3는 Worker03에 의해 잘 끝나고 더 이상 메시지가 없어 Worker03이 대기 하고 있습니다.
4. 30분 뒤
여기서 문제가 발생하게 되는데, RabbitMQ에서는 consumer_timeout 시간이 Default로 30분으로 지정되어 있습니다.
RabbitMQ는 Worker01이 30분 동안 Ask를 날리고 있지 않기 때문에 연결을 끊어버리고 M1을 Ready 상태로 만들어 버립니다.
그럼 M1은 대기 중인 Worker03으로 할당 되어 버리고 Worker01은 연결이 끊어진 것도 모르고 계속 처리하게 됩니다.
이런식으로 M1은 여기 저기 Worker들을 떠돌아 다니며 죽지 않고 계속 Timeout Exception을 발생하게 됩니다.
해결방법
1. consumer_timeout 늘리기
https://www.rabbitmq.com/consumers.html
가장 쉬운 방법은 consumer_timeout을 늘려버리면 됩니다.
그럼 RabbitMQ는 consumer가 지정한 시간 동안 Ask를 하지 않아도 연결을 끊지 않고 기다립니다.
rabbitmq.conf 설정
# one hour in milliseconds
consumer_timeout = 3600000
위와 같이 rabbitmq.conf를 설정 할 수 있습니다.
docker-compose.yml에서 설정
environment:
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit consumer_timeout 3600000
위와 같이 docker-compose.yml에서 환경 변수로 지정 할 수 있습니다.
consumer_timeout 확인 방법
$ rabbitmqctl eval 'application:get_env(rabbit, consumer_timeout).'
{ok,3600000}
확인 방법은 rabbitmqctl로 확인 가능합니다.
consumer_timeout 비활성화 시키기
consumer_timeout을 비활성화 시키는 방법이 있는데, RabbitMQ 쪽에서는 권장하지 않는 방법입니다.
advanced.config에서 설정 할 수 있습니다.
[
{rabbit, [
{consumer_timeout, undefined}
]}
].
2. 메시지 TTL 늘리기
그리고 consumer_timeout을 지정해도 그 시간을 넘게 되면 또 똑같은 현상이 일어나게 됩니다.
그것을 방지하기 위해 Queue에서 메시지가 머물 수 있는 시간을 지정하면 됩니다.
var queueArgs = new Dictionary<string, object>
{
{ "x-message-ttl", 10000 }, // 10초
{ "x-dead-letter-exchange", "my_exchange" }
};
channel.QueueDeclare(queue: "my_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: queueArgs);
위 소스는 C# 소스입니다.
Queue의 Argument에서 x-message-ttl 을 지정하면 여기 저기 떠돌아 다녀도 queue에서 최대 10초 까지만 머물 수 있게 됩니다.
그리고 아래의 x-dead-letter-exchange 는 메시지가 ttl을 넘기면 다른 queue로 메시지를 이동 시킵니다.
메시지를 다른 queue로 옮겨 놓으면 나중에 확인해서 재 처리하거나 활용 할 수 있습니다.
x-message-ttl 로 queue 자체에 지정할 수 도 있고, 메시지 개별개별로 지정도 가능합니다.
메시지를 Producer가 Publish 할 때 메시지 속성 중 expiration에 시간을 지정 하면 특정 메시지에 TTL을 지정 할 수 있습니다.