업무상 비동기 메시지 처리가 필요한 비즈니스 시나리오를 자주 접하게 됩니다. 메시지의 성격에 따라 완전히 다른 처리 방법이 있습니다.
1. 메시지는 독립적이지 않습니다.
독립적이지 않은 메시지는 일반적으로 순차적 종속성을 갖습니다. 이때 메시지 처리 메커니즘은 선형 대기열 처리 모드로 변질되며 단일 스레드에서는 한 명의 소비자만 메시지를 처리할 수 있습니다.
2. 메시지는 완전히 독립적입니다.
완전히 독립적인 메시지는 동시에 여러 소비자(스레드)에 의해 동시에 처리될 수 있어 최대 동시 처리 기능을 달성할 수 있습니다.
3. 메시지는 완전히 독립적이지 않습니다.
일반적으로 이러한 상황은 (동일한 생산자의) 동종 메시지를 순서대로 정렬해야 하며, 이종 메시지의 순서는 이와 관련이 없습니다.
이 시나리오에서 메시지 처리는 상대적으로 복잡합니다. 동일한 소스의 메시지가 순서대로 유지되도록 하려면 고정된 소비자 스레드를 동일한 소스의 메시지에 바인딩하는 것을 생각하기 쉽습니다. 문제.
생산자 수가 많으면 바인딩된 스레드 수가 충분하지 않을 수 있습니다. 물론 스레드 리소스를 재사용할 수 있으며 동일한 스레드를 여러 메시지 소스에 바인딩하여 처리할 수 있습니다. 이로 인해 상호 문제가 발생합니다. 메시지 소스 간의 영향.
다음 시나리오를 고려해보세요.
생산자 P1은 대량의 메시지를 생성하고 대기열에 들어가 처리를 위해 소비자 스레드 C1에 할당됩니다(C1은 처리하는 데 시간이 오래 걸릴 수 있음). 이때 생산자 P2는 메시지를 생성합니다. 불행하게도 이 또한 할당됩니다. 소비자 스레드 C1에 처리
가 주어지면 생산자 P2의 메시지 처리가 P1의 많은 메시지로 인해 차단되어 P1과 P2, 그리고 다른 소비자 간에 상호 영향을 미치게 됩니다. 스레드를 완전히 활용할 수 없어 불균형이 발생합니다.
그래서 우리는 그러한 문제를 피하는 것을 고려해야 합니다. 소비 처리(가능한 한 빨리), 격리(상호 간섭 방지) 및 균형(동시 처리 최대화)의 적시성을 달성합니다.
구현에는 두 가지 모드가 있을 것입니다. 생각하기 쉬운 것은 스레드 디스패치 모델(PUSH)입니다. 모드) ), 구체적인 방법은 일반적으로 다음과 같습니다.
1 메시지를 검색하기 위해 대기열을 폴링하는 전역 메시지 디스패처가 있습니다.
2. 메시지 소스에 따라 처리를 위해 적절한 소비자 스레드로 전달합니다.
분배 알고리즘 메커니즘은 메시지 소스를 기반으로 하는 해시처럼 간단할 수도 있고, 각 소비자 스레드의 현재 로드, 대기 대기열의 길이, 메시지의 복잡성만큼 복잡할 수도 있으며 배포를 위해 선택할 수 있습니다. 종합적인 분석을 바탕으로 합니다.
간단한 해시는 위 시나리오에 설명된 문제에 분명히 직면하게 되지만, 복잡한 분포 계산은 분명히 매우 번거롭고 구현하기 복잡하며 효율성이 반드시 좋은 것은 아니며 균형 측면에서 완벽한 균형을 달성하기가 어렵습니다.
두 번째 모드는 PULL 방식을 사용하며, 구체적인 방식은 다음과 같습니다.
1 메시지 소스는 생성된 메시지를 소스에 해당하는 임시 대기열에 직접 넣습니다(아래 그림 참조). 세션은 메시지의 다른 소스를 나타냅니다. 그런 다음 세션을 차단 대기열에 넣어 스레드에 처리를 알립니다.
2. 여러 소비자 스레드가 동시에 대기열을 폴링하고 메시지에 대해 경쟁합니다(단 하나만 보장하기 위해). 스레드가 가져옵니다
3. 큐 표시기가 처리되고 있는지 확인합니다. 다른 스레드에 의해 처리 중입니다(구현에는 스레드 수준에서 동일한 출처 메시지를 기반으로 한 감지 동기화가 필요함)
4. 동기화 영역 설정 처리를 하고, 동기화 영역을 빠져나온 후 임시 큐에 있는 메시지를 처리합니다. 소비자 스레드 처리 프로세스를 설명하는 코드:
public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
위 내용은 Springboot 비동기 메시지 처리 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!