최근에 오래된 시스템을 개조하다가 RabbitMq가 필요한 시나리오를 접하게 되었습니다. 이전 프로세스에서 전송 및 소비 측에 다양한 구성이 필요했는데, 그게 번거로웠던 것 같습니다. 그러다 문득 @Reference 주석 형식이 생각났습니다. dubbo. 가능할까요? MQ 호출이 동기 인터페이스 호출만큼 편리하고 간단하도록 유사한 쉘프를 만드는 방법은 무엇입니까? 그래서 관련 정보를 확인하고 dubbo의 소스 코드를 읽어보니 아이디어가 생겼습니다. 일반적으로 달성하려는 목표는 dubbo와 마찬가지로 소비자 측에서 인터페이스를 노출하고(dubbo 서비스에서 정의한 인터페이스를 재사용할 수도 있으므로 dubbo 서비스 작성이 동기식 또는 MQ 비동기식이 될 수 있음) 송신 측에서 주입합니다. 사용자 정의 주석을 통해 메소드를 호출하고 이를 프레임워크 내에서 내부적으로 처리합니다. 그런 다음 비동기식 mq 형식으로 변환되어 소비자에게 전송됩니다.
예를 들어 서버에는 인터페이스가 있습니다.
public interface MqDemoService { void dealById(Long id); }
구현:
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
그 중:
@Slf4j是lombok注解 @Service是dubbo服务端注解
관심 있는 학생이 직접 확인할 수 있습니다
그런 다음 전송 끝
에 사용자 정의가 있습니다. 참고:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
호출 컨트롤러에서:
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
에서 @AsyncInvoker로 주석이 달린 mqDemoService 속성에 주의하세요. 이 Annotation을 통해 주입된 객체가 메소드를 호출하면 mq를 통해 전송되어 비동기 호출이 됩니다.
자, 달성하려는 목표는 매우 명확합니다. 그러면 해결해야 할 문제는 다음과 같습니다. :
1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
1. 수신측에서 호출 방법을 결정할 수 있도록 메시지 전송 형식을 결정하는 방법
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
2. 보낸 사람의 @AsyncInvoker 주석이 달린 개체에 인스턴스를 삽입합니다
그렇다면 @AsyncInvoker에 의해 주석이 달린 객체가 동적 프록시에 주입되어야 한다는 것을 Spring에게 어떻게 알릴 수 있을까요?
그러면 코드는 다음과 같습니다.
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i < args.length; i++) { paramTypeNames[i] = args[i].getClass().getName(); } mqMethodMeta.setParamTypeNames(paramTypeNames); RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory()); Exchange exchange = new TopicExchange("exchange.demo.web.adaptor"); admin.declareExchange(exchange); //关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta); return null; } }); proxyMap.putIfAbsent(interfaceName, newProxy); proxy = proxyMap.get(interfaceName); } return proxy; } }
3. , 해당 인터페이스의 구현 메소드를 호출하세요
@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i < message.getParamTypeNames().length; i++) { paramTypes[i] = Class.forName(message.getParamTypeNames()[i]); } //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下 for (int i = 0; i < args.length; i++) { Class c = paramTypes[i]; if (args[i] instanceof Integer && c.equals(Long.class)) { args[i] = ((Integer) args[i]).longValue(); } } //拿到spring管理的对应接口的实现 Object invoker = applicationContext.getBean(clz); Method method = clz.getMethod(methodName, paramTypes); method.invoke(invoker, args); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
4. 다중 소비자 서비스는 어떻습니까?
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
${demo.mq.method.queue} ${demo.mq.method.routekey}
를 읽습니다. 구성 파일:
예를 들어 시스템 1의 구성은 다음과 같습니다.
demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
시스템 2의 구성은 다음과 같습니다.
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
발신자의 코드를 살펴보세요.
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
여기의 clz.getName() 우리 시스템에는 좋은 하도급 전략이 있으므로 시스템 1의 clz.getName()은 반드시 com.demo.service.project1로 시작해야 합니다. 예를 들어, 값은 다음과 같습니다. clz.getName()은 com.demo .service.project1.MqDemoService입니다(".#"은 다음 다중 식별자와 일치하며 이는 RabbitMQ의 주제 유형 교환 기능입니다).
이 시점에서 우리가 달성하고자 했던 목표는 앞으로는 비동기식 호출을 위해 mq를 사용해야 합니다. 동기식 방법처럼 사용할 수 있습니다.
스프링에서 mq의 사용은 여기에 자세히 나열되지 않습니다. 문서를 참조하세요. :
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
녹음 및 참고용으로 데모 코드 세트가 제공됩니다
요약
1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考. 2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
첫 번째 질문은 해당 비즈니스를 사용한 후에 고려해야 합니다. 아니면 아이디어가 있으면 토론해도 됩니다.
두 번째 질문은 주로 소비자가 매개변수 유형이나 다른 상황을 변경하면 다시 게시한 후 남아 있을 수 있는 이전 메시지와 호환된다는 점을 고려합니다. mq. 현재로서는 이에 대한 좋은 아이디어가 없으며 단지 브레인스토밍용일 뿐입니다.
위 내용은 Rabbit mq를 사용하여 dubbo를 시뮬레이션하고 MQ 비동기 호출을 수행합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!