RocketMQ를 SpringBoot와 통합할 때 직면하는 함정을 해결하는 방법

WBOY
풀어 주다: 2023-05-19 11:25:23
앞으로
1700명이 탐색했습니다.

애플리케이션 시나리오

RocketMQ 소비를 구현할 때 @RocketMQMessageListener 주석은 일반적으로 Group, Topic 및 selectorExpression(데이터 필터링 및 선택 규칙)을 정의하는 데 사용됩니다. 데이터의 동적 필터링을 지원하기 위해 일반적으로 표현식이 사용되며 Apollo를 통해 사용됩니다. 또는 동적 전환을 위한 클라우드 구성.

종속성 소개

  org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 
로그인 후 복사

소비자 코드

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}") public class Consumer implements RocketMQListener { @Override public void onMessage(String s) { System.out.println("消费到的数据为:"+s); } }
로그인 후 복사

Troubleshooting

RocketMQMessageListener 전체 주석의 기본 selectorExpression은 *입니다. 이는 현재 주제 아래의 모든 데이터를 수신한다는 의미입니다. 태그를 동적으로 구성하려면 ${rocketmq.selectorExpression을 사용하세요. } 표현식을 사용하면 모든 데이터가 필터링된 것을 확인할 수 있습니다. 소스 코드(ListenerContainerConfiguration.java)를 추적하면 리스너 생성 시 환경 변수에서 해당 데이터를 가져온 후 selectorExpression의 데이터가 덮어쓰기되어 전체 필터링이 발생하는 것으로 나타났습니다. 조건이 바뀌는 표현입니다.

@Override public void afterSingletonsInstantiated() { // 获取所有所有使用了RocketMQMessageListener注解的bean Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { // 循环注册容器 beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class clazz = AopProxyUtils.ultimateTargetClass(bean); // 校验当前bean是否实现了RocketMQListener接口 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } // 获取bean上的annotation RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // 解析group及topic,可支持表达式 String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注册bean的,调用createRocketMQListenerContainer genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此处已经根据表达式将数据取出 String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此处将SelectorExpression的数据覆盖成了表达式 container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; }
로그인 후 복사

문제 해결

ListenerContainerConfiguration 클래스는 SmartInitializingSingleton 인터페이스의 afterSingletonsInstantiated 메서드를 구현하기 때문에 ListenerContainerConfiguration을 초기화하기 전에 리플렉션을 통해 selectorExpression 데이터를 구문 분석하고 다시 할당할 수 있습니다.

/** * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据 **/ @Configuration public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Autowired private StandardEnvironment environment; @Override public void afterPropertiesSet() throws Exception { Map beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); for (Object bean : beans.values()){ Class clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { continue; } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField("memberValues"); field.setAccessible(true); Map memberValues = (Map) field.get(invocationHandler); for (Map.Entry entry: memberValues.entrySet()) { if(Objects.nonNull(entry)){ memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); } } } } }
로그인 후 복사

처음 제외하고는 종속성 패키지 2.1.0 버전에서 이 버그가 수정되었습니다. 종속성 충돌을 일으키지 않는다는 전제하에 버전 2.1.0 이상을 사용하는 것이 좋습니다.

위 내용은 RocketMQ를 SpringBoot와 통합할 때 직면하는 함정을 해결하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:yisu.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!