ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Rabbit MQ 분배 패턴과 멀티 컨슈머
    Message Queue 2021. 8. 14. 01:01

    공부용 포스팅입니다 

    틀린 부분이 있을 수 있습니다.


    RabbitMQ 분배 패턴 

     

    Round-Robin Dispatching

    - 모든 Consumer에게 공정하게 메시지를 분배하는 패턴 

    - 같은 Queue의 메시지를 소비하는 Consumer들에게 Queue의 메시지를 하나씩 분배한다 

    - 1번 메시지는 Consumer 1에게, 2번 메시지는 Consumer 2에게 ... 

     

    RabbitListner 하나의 Queue를 2개의 Consumer가 소비

        @RabbitListener(queues = "fanout-test")
        public void fanoutTest(String testStr) throws IOException{
            log.info("Consumer 1 : "+testStr);
        }
        @RabbitListener(queues = "fanout-test")
        public void fanoutTest2(String testStr) throws IOException{
            log.info("Consumer 2 : " +testStr);
        }

    실행 로그

    2021-08-13 23:55:59.938  INFO 1696 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 1 : Round-Robin 테스트
    2021-08-13 23:56:01.007  INFO 1696 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트
    2021-08-13 23:56:01.769  INFO 1696 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 1 : Round-Robin 테스트
    2021-08-13 23:56:02.453  INFO 1696 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트
    2021-08-13 23:56:03.145  INFO 1696 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 1 : Round-Robin 테스트
    2021-08-13 23:56:03.803  INFO 1696 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트

     

     

    Fair Dispatching

    - 하나의 Consumer만 일을 많이 하는 것을 방지해 공평하게 메시지를 분배하는 패턴

    - 2개의 Consumer가 존재할 때 매번 홀수번 째 메시지만 무거운 처리가 필요하면 1번 Consumer는 계속해 일을 처리하고 2번 Consumer는 가벼운 처리만 하는 것을 방지하기 위한 패턴

    - Prefetchcount = 메모리에 쌓아놓을 수 있는 메시지의 양을 의미

    - Prefetchcount를 1로 정함으로 이미 메시지를 처리하고 있는 Consumer에게 다시 메시지를 발행하지 말라는 뜻

    - Consumer가 메시지를 처리하고 있을 때 다른 Consumer가 처리하고 있지 않다면 해당 Consumer에게 메시지를 전달해 공평하게 분배하는 패턴

    - Spring - rabbit 2.3.6 버전 기준으로 public static final int DEFAULT_PREFETCH_COUNT = 250;로 설정되어있음

     

    테스트 방법

    1. SimpleMessageListenerContainer에 등록된 prefetchcount를 1로 변경 

    2. RabbitListener에 해당 변경 사항을 적용

    3. Consumer1에 Thread.sleep(3000)을 통해 3초의 지연시간 부여

     

    PrefetchCount 1로 설정

        @Bean
        public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneContainerFactory(
                SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory factory)
        {
            SimpleRabbitListenerContainerFactory simpleFactory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(simpleFactory, factory);
            simpleFactory.setPrefetchCount(1);
    
            return simpleFactory;
        }

    RabbitListner에 컨테이너 설정 부여 후 스레드 슬립 적용 

        @RabbitListener(queues = "fanout-test", containerFactory = "prefetchOneContainerFactory")
        public void fanoutTest(String testStr) throws IOException{
            try{
                Thread.sleep(3000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            log.info("Consumer 1 : "+testStr);
        }
        @RabbitListener(queues = "fanout-test", containerFactory = "prefetchOneContainerFactory")
        public void fanoutTest2(String testStr) throws IOException{
            log.info("Consumer 2 : " +testStr);
        }

    실행 로그

    2021-08-14 00:05:44.417  INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트
    2021-08-14 00:05:44.939  INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트
    2021-08-14 00:05:45.440  INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트
    2021-08-14 00:05:46.025  INFO 20504 --- [ntContainer#1-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 2 : Round-Robin 테스트
    2021-08-14 00:05:46.972  INFO 20504 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : Consumer 1 : Round-Robin 테스트

    로그를 살펴보면

    빠르게 처리되는 Consumer 2만 4번 호출되고 

    3초의 지연시간을 가진 Consumer 1은 1번만 호출된 것을 확인할 수 있었습니다. 


    Rabbit MQ 멀티 컨슈머 설정

    - Consumer를 멀티 스레드를 통해 사용하는 것 

    - 메시지 소비 속도가 빠름

    - Producer의 메시지 발행이 Consumer의 소비보다 빠를 때 사용하면 성능이 좋음, 메시지 하나하나 처리가 오래 걸리는 경우에도 좋을 듯? 

    - 기본적으로 메모리 영역에 메시지를 저장해놓는 크기인 prefetch count는 250으로 설정되어 있음

    250개가 넘어가면 메시지를 처리하지 못하는 것으로 알고 있는데

    이를 해결하기 위해 사용하는 것이 Multi Consumer라고 알고 있음

     

    SimpleMessageListenerContainer 들어가 확인한 코드

        protected int initializeConsumers() {
            int count = 0;
            synchronized(this.consumersMonitor) {
                if (this.consumers == null) {
                    this.cancellationLock.reset();
                    this.consumers = new HashSet(this.concurrentConsumers);
    
                    for(int i = 1; i <= this.concurrentConsumers; ++i) {
                        BlockingQueueConsumer consumer = this.createBlockingQueueConsumer();
                        if (this.getConsumeDelay() > 0L) {
                            consumer.setConsumeDelay(this.getConsumeDelay() * (long)i);
                        }
    
                        this.consumers.add(consumer);
                        ++count;
                    }
                }
    
                return count;
            }
        }

    Rabbit MQ가 시작되면 설정된 concurrentConsumer의 수 만큼 BlockingQueueConsumer ( 컨슈머 대기열 )가 생성됩니다 

     

    AbstractMessageListenerContainer 들어가 확인한 코드

    default prefetch count가 250임을 확인할 수 있었음 

    public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware {
        private static final int EXIT_99 = 99;
        private static final String UNCHECKED = "unchecked";
        static final int DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL = 5000;
        public static final boolean DEFAULT_DEBATCHING_ENABLED = true;
        public static final int DEFAULT_PREFETCH_COUNT = 250;
        public static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
        public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000L;
        .....
        ..
        .
        }

    BlokingQueueConsumer 들어가 확인한 코드 

    블로킹 큐를 prefetchcount를 통해 생성하는 것을 확인할 수 있었음 

    public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
    		...
            ..
            .
            this.queue = new LinkedBlockingQueue(prefetchCount);
        }

     

    ! prefetch count와 크기가 같은 Queue를 가진 concurrency 개수만큼의 Consumer를 생성해 사용하는 것 !이라고 생각이 드네요 .. 자세한 건 더 알아봐야 할 것 같습니다 

     

    default prefetch count인 250개의 메시지가 이미 꽉 차있어 다음 메시지를 저장할 수 없는 상태라면 공급되는 메시지를 처리할 수 없겠죠

    이를 해결하기 위해 ConcurrentConsumer의 설정 값을 증가시켜 여러 Consumer를 생성해 동시에 메시지를 소비할 수 있는 Multi Consumer를 사용하게 됩니다.

     

    SimpleMessageListenerContainer 들어가 확인한 코드 

        public void setConcurrency(String concurrency) {
            try {
                int separatorIndex = concurrency.indexOf(45);
                if (separatorIndex != -1) {
                    int consumersToSet = Integer.parseInt(concurrency.substring(0, separatorIndex));
                    int maxConsumersToSet = Integer.parseInt(concurrency.substring(separatorIndex + 1));
                    Assert.isTrue(maxConsumersToSet >= consumersToSet, "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
                    this.concurrentConsumers = 1;
                    this.maxConcurrentConsumers = null;
                    this.setConcurrentConsumers(consumersToSet);
                    this.setMaxConcurrentConsumers(maxConsumersToSet);
                } else {
                    this.setConcurrentConsumers(Integer.parseInt(concurrency));
                }
    
            } catch (NumberFormatException var5) {
                throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only single fixed integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.", var5);
            }
        }

    사용자가 직접 concurrency를 설정해

    concurrency consumer와 max conccurency consumer를 설정합니다.

     

    RabbitListner - multi consumer 적용 

        @RabbitListener(queues = "fanout-test", concurrency = "3")
        public void fanoutTest(String testStr) throws IOException{
            log.info("Multi Consumer  : "+testStr);
            log.info(Thread.currentThread().getName());
        }

    적용 자체는 간단한게 concurrency 속성을 추가함으로 몇 개를 사용할 것인지 정할 수 있습니다. 

    예제에선 3개의 컨슈머를 설정했고 실행 결과는 다음과 같습니다 

     

    실행 로그 

    2021-08-14 00:23:28.254  INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:28.254  INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-3
    2021-08-14 00:23:28.686  INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:28.686  INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2
    2021-08-14 00:23:28.954  INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:28.954  INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1
    2021-08-14 00:23:29.312  INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:29.312  INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-3
    2021-08-14 00:23:29.616  INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:29.616  INFO 16684 --- [ntContainer#0-2] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2
    2021-08-14 00:23:29.942  INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:29.942  INFO 16684 --- [ntContainer#0-1] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1
    2021-08-14 00:23:30.228  INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer   : Multi Consumer 테스트
    2021-08-14 00:23:30.228  INFO 16684 --- [ntContainer#0-3] k.s.daangn.service.push.RabbitConsumer   : org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-3

    컨슈머가 3개의 스레드로 동작해 메시지를 소비하고 있음을 확인할 수 있었습니다.

     

     

     

     

     

    ps. 생각보다 어렵네요!  

    'Message Queue' 카테고리의 다른 글

    Rabbit MQ에 대해  (0) 2021.08.14

    댓글

Designed by Tistory.