-
Rabbit MQ 분배 패턴과 멀티 컨슈머Message Queue 2021. 8. 14. 01:01
공부용 포스팅입니다
틀린 부분이 있을 수 있습니다.
RabbitMQ 분배 패턴
Round-Robin Dispatching
- 모든 Consumer에게 공정하게 메시지를 분배하는 패턴
- 같은 Queue의 메시지를 소비하는 Consumer들에게 Queue의 메시지를 하나씩 분배한다
- 1번 메시지는 Consumer 1에게, 2번 메시지는 Consumer 2에게 ...
RabbitListner 하나의 Queue를 2개의 Consumer가 소비
@RabbitListener(queues = "fanout-test") public void fanoutTest(String testStr) throws IOException{ log.info("Consumer 1 : "+testStr); } @RabbitListener(queues = "fanout-test") public void fanoutTest2(String testStr) throws IOException{ log.info("Consumer 2 : " +testStr); }
실행 로그
2021-08-13 23:55:59.938 INFO 1696 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 1 : Round-Robin 테스트 2021-08-13 23:56:01.007 INFO 1696 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트 2021-08-13 23:56:01.769 INFO 1696 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 1 : Round-Robin 테스트 2021-08-13 23:56:02.453 INFO 1696 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트 2021-08-13 23:56:03.145 INFO 1696 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 1 : Round-Robin 테스트 2021-08-13 23:56:03.803 INFO 1696 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트
Fair Dispatching
- 하나의 Consumer만 일을 많이 하는 것을 방지해 공평하게 메시지를 분배하는 패턴
- 2개의 Consumer가 존재할 때 매번 홀수번 째 메시지만 무거운 처리가 필요하면 1번 Consumer는 계속해 일을 처리하고 2번 Consumer는 가벼운 처리만 하는 것을 방지하기 위한 패턴
- Prefetchcount = 메모리에 쌓아놓을 수 있는 메시지의 양을 의미
- Prefetchcount를 1로 정함으로 이미 메시지를 처리하고 있는 Consumer에게 다시 메시지를 발행하지 말라는 뜻
- Consumer가 메시지를 처리하고 있을 때 다른 Consumer가 처리하고 있지 않다면 해당 Consumer에게 메시지를 전달해 공평하게 분배하는 패턴
- Spring - rabbit 2.3.6 버전 기준으로 public static final int DEFAULT_PREFETCH_COUNT = 250;로 설정되어있음
테스트 방법
1. SimpleMessageListenerContainer에 등록된 prefetchcount를 1로 변경
2. RabbitListener에 해당 변경 사항을 적용
3. Consumer1에 Thread.sleep(3000)을 통해 3초의 지연시간 부여
PrefetchCount 1로 설정
@Bean public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory factory) { SimpleRabbitListenerContainerFactory simpleFactory = new SimpleRabbitListenerContainerFactory(); configurer.configure(simpleFactory, factory); simpleFactory.setPrefetchCount(1); return simpleFactory; }
RabbitListner에 컨테이너 설정 부여 후 스레드 슬립 적용
@RabbitListener(queues = "fanout-test", containerFactory = "prefetchOneContainerFactory") public void fanoutTest(String testStr) throws IOException{ try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } log.info("Consumer 1 : "+testStr); } @RabbitListener(queues = "fanout-test", containerFactory = "prefetchOneContainerFactory") public void fanoutTest2(String testStr) throws IOException{ log.info("Consumer 2 : " +testStr); }
실행 로그
2021-08-14 00:05:44.417 INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트 2021-08-14 00:05:44.939 INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트 2021-08-14 00:05:45.440 INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트 2021-08-14 00:05:46.025 INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer : Consumer 2 : Round-Robin 테스트 2021-08-14 00:05:46.972 INFO 20504 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : Consumer 1 : Round-Robin 테스트
로그를 살펴보면
빠르게 처리되는 Consumer 2만 4번 호출되고
3초의 지연시간을 가진 Consumer 1은 1번만 호출된 것을 확인할 수 있었습니다.
Rabbit MQ 멀티 컨슈머 설정
- Consumer를 멀티 스레드를 통해 사용하는 것
- 메시지 소비 속도가 빠름
- Producer의 메시지 발행이 Consumer의 소비보다 빠를 때 사용하면 성능이 좋음, 메시지 하나하나 처리가 오래 걸리는 경우에도 좋을 듯?
- 기본적으로 메모리 영역에 메시지를 저장해놓는 크기인 prefetch count는 250으로 설정되어 있음
250개가 넘어가면 메시지를 처리하지 못하는 것으로 알고 있는데
이를 해결하기 위해 사용하는 것이 Multi Consumer라고 알고 있음
SimpleMessageListenerContainer 들어가 확인한 코드
protected int initializeConsumers() { int count = 0; synchronized(this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashSet(this.concurrentConsumers); for(int i = 1; i <= this.concurrentConsumers; ++i) { BlockingQueueConsumer consumer = this.createBlockingQueueConsumer(); if (this.getConsumeDelay() > 0L) { consumer.setConsumeDelay(this.getConsumeDelay() * (long)i); } this.consumers.add(consumer); ++count; } } return count; } }
Rabbit MQ가 시작되면 설정된 concurrentConsumer의 수 만큼 BlockingQueueConsumer ( 컨슈머 대기열 )가 생성됩니다
AbstractMessageListenerContainer 들어가 확인한 코드
default prefetch count가 250임을 확인할 수 있었음
public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware { private static final int EXIT_99 = 99; private static final String UNCHECKED = "unchecked"; static final int DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL = 5000; public static final boolean DEFAULT_DEBATCHING_ENABLED = true; public static final int DEFAULT_PREFETCH_COUNT = 250; public static final long DEFAULT_RECOVERY_INTERVAL = 5000L; public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000L; ..... .. . }
BlokingQueueConsumer 들어가 확인한 코드
블로킹 큐를 prefetchcount를 통해 생성하는 것을 확인할 수 있었음
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) { ... .. . this.queue = new LinkedBlockingQueue(prefetchCount); }
! prefetch count와 크기가 같은 Queue를 가진 concurrency 개수만큼의 Consumer를 생성해 사용하는 것 !이라고 생각이 드네요 .. 자세한 건 더 알아봐야 할 것 같습니다
default prefetch count인 250개의 메시지가 이미 꽉 차있어 다음 메시지를 저장할 수 없는 상태라면 공급되는 메시지를 처리할 수 없겠죠
이를 해결하기 위해 ConcurrentConsumer의 설정 값을 증가시켜 여러 Consumer를 생성해 동시에 메시지를 소비할 수 있는 Multi Consumer를 사용하게 됩니다.
SimpleMessageListenerContainer 들어가 확인한 코드
public void setConcurrency(String concurrency) { try { int separatorIndex = concurrency.indexOf(45); if (separatorIndex != -1) { int consumersToSet = Integer.parseInt(concurrency.substring(0, separatorIndex)); int maxConsumersToSet = Integer.parseInt(concurrency.substring(separatorIndex + 1)); Assert.isTrue(maxConsumersToSet >= consumersToSet, "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'"); this.concurrentConsumers = 1; this.maxConcurrentConsumers = null; this.setConcurrentConsumers(consumersToSet); this.setMaxConcurrentConsumers(maxConsumersToSet); } else { this.setConcurrentConsumers(Integer.parseInt(concurrency)); } } catch (NumberFormatException var5) { throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only single fixed integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.", var5); } }
사용자가 직접 concurrency를 설정해
concurrency consumer와 max conccurency consumer를 설정합니다.
RabbitListner - multi consumer 적용
@RabbitListener(queues = "fanout-test", concurrency = "3") public void fanoutTest(String testStr) throws IOException{ log.info("Multi Consumer : "+testStr); log.info(Thread.currentThread().getName()); }
적용 자체는 간단한게 concurrency 속성을 추가함으로 몇 개를 사용할 것인지 정할 수 있습니다.
예제에선 3개의 컨슈머를 설정했고 실행 결과는 다음과 같습니다
실행 로그
2021-08-14 00:23:28.254 INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:28.254 INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-3 2021-08-14 00:23:28.686 INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:28.686 INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2 2021-08-14 00:23:28.954 INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:28.954 INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 2021-08-14 00:23:29.312 INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:29.312 INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-3 2021-08-14 00:23:29.616 INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:29.616 INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2 2021-08-14 00:23:29.942 INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:29.942 INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 2021-08-14 00:23:30.228 INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer : Multi Consumer 테스트 2021-08-14 00:23:30.228 INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-3
컨슈머가 3개의 스레드로 동작해 메시지를 소비하고 있음을 확인할 수 있었습니다.
ps. 생각보다 어렵네요!
'Message Queue' 카테고리의 다른 글
Rabbit MQ에 대해 (0) 2021.08.14