Home>Article>Java> Java handwriting an RPC framework

Java handwriting an RPC framework

Guanhui
Guanhui forward
2020-06-17 17:30:41 3233browse

Java handwriting an RPC framework

The RPC framework is called the remote calling framework. The core principle of its implementation is that the consumer uses a dynamic proxy to proxy an interface (Dynamic proxy based on JDK, of course if Using CGLib, you can directly use methods without interface classes). By adding network transmission programming, the transmission call interface method name and method parameters are obtained by the provider, and then through reflection, the method of the interface is executed, and then the reflection is executed. The results are sent back to the consumer through network programming.

Now let’s implement these concepts in turn. Here we do the simplest implementation. Network programming uses BIO. You can use Netty in Reactor mode to rewrite it in a way with better performance. The serialization and deserialization used in network transmission are also native to Java. Of course, such transmission bytes are relatively large and can be processed using Google's protoBuffer or kryo.This is just for convenience to explain the principle.

pom

  4.0.0 com.guanjian rpc-framework 1.0-SNAPSHOT    org.apache.maven.plugins maven-compiler-plugin 3.7.0  1.8 1.8 UTF-8     

First of all, of course, the interface and the interface method we want to call remotely.

public interface HelloService { String sayHello(String content);}

Interface implementation class

public class HelloServiceImpl implements HelloService { public String sayHello(String content) { return "hello," + content; } }

Dynamic proxy on the consumer side,If you write the provider and consumer in two projects, the provider side needs the above interface and implementation classes, while the consumer side only needs the above interface.

public class ConsumerProxy { /** * 消费者端的动态代理 * @param interfaceClass 代理的接口类 * @param host 远程主机IP * @param port 远程主机端口 * @param  * @return */ @SuppressWarnings("unchecked") public static  T consume(final Class interfaceClass,final String host,final int port) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, (proxy,method,args) -> { //创建一个客户端套接字 Socket socket = new Socket(host, port); try { //创建一个对外传输的对象流,绑定套接字 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { //将动态代理的方法名写入对外传输的对象流中 output.writeUTF(method.getName()); //将动态代理的方法的参数写入对外传输的对象流中 output.writeObject(args); //创建一个对内传输的对象流,绑定套接字 //这里是为了获取提供者端传回的结果 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //从对内传输的对象流中获取结果 Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } ); } }

For information about JDK dynamic proxy, please refer to AOP principles and self-implementation. For BIO, please refer to the comparison between traditional IO and NIO.

Provider-side network transmission and remote calling services

public class ProviderReflect { private static final ExecutorService executorService = Executors.newCachedThreadPool(); /** * RPC监听和远程方法调用 * @param service RPC远程方法调用的接口实例 * @param port 监听的端口 * @throws Exception */ public static void provider(final Object service,int port) throws Exception { //创建服务端的套接字,绑定端口port ServerSocket serverSocket = new ServerSocket(port); while (true) { //开始接收客户端的消息,并以此创建套接字 final Socket socket = serverSocket.accept(); //多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽 executorService.execute(() -> { try { //创建呢一个对内传输的对象流,并绑定套接字 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { try { //从对象流中读取接口方法的方法名 String methodName = input.readUTF(); //从对象流中读取接口方法的所有参数 Object[] args = (Object[]) input.readObject(); Class[] argsTypes = new Class[args.length]; for (int i = 0;i < args.length;i++) { argsTypes[i] = args[i].getClass(); } //创建一个对外传输的对象流,并绑定套接字 //这里是为了将反射执行结果传递回消费者端 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { Class[] interfaces = service.getClass().getInterfaces(); Method method = null; for (int i = 0;i < interfaces.length;i++) { method = interfaces[i].getDeclaredMethod(methodName,argsTypes); if (method != null) { break; } } Object result = method.invoke(service, args); //将反射执行结果写入对外传输的对象流中 output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } catch (Exception e) { e.printStackTrace(); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } }); } } }

Start the network listening and remote calling on the provider side

public class RPCProviderMain { public static void main(String[] args) throws Exception { HelloService service = new HelloServiceImpl(); ProviderReflect.provider(service,8083); } }

Start the dynamic proxy call of the consumer

public class RPCConsumerMain { public static void main(String[] args) throws InterruptedException { HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083); for (int i = 0;i < 1000;i++) { String hello = service.sayHello("你好_" + i); System.out.println(hello); Thread.sleep(1000); } } }

Running result

hello, hello _0
hello, hello_1
hello, hello_2
hello, hello_3
hello, hello_4
hello, hello_5

.....

If you want to extend it into a high-performance RPC framework with Netty ProtoBuffer, you can refer to the related writing methods of Netty integrating Protobuffer.

Recommended tutorial: "PHP"

The above is the detailed content of Java handwriting an RPC framework. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:oschina.net. If there is any infringement, please contact admin@php.cn delete