Maison > Java > javaDidacticiel > Java écriture manuscrite d'un framework RPC

Java écriture manuscrite d'un framework RPC

Guanhui
Libérer: 2020-06-17 17:31:03
avant
3353 Les gens l'ont consulté

Java écriture manuscrite d'un framework RPC

Le framework RPC est appelé framework d'appel à distance. Le principe de base de sa mise en œuvre est que le côté consommateur utilise un proxy dynamique pour proxy une interface (Proxy dynamique basé sur JDK, bien sûr. si vous utilisez CGLib, vous pouvez utiliser directement des méthodes sans classes d'interface ). En ajoutant la programmation de transmission réseau, le nom de la méthode de l'interface d'appel de transmission et les paramètres de la méthode sont obtenus par le fournisseur, puis par réflexion, la méthode de l'interface est exécutée. , puis la réflexion est exécutée. Les résultats sont renvoyés au consommateur via la programmation réseau.

Maintenant, implémentons ces concepts tour à tour. Ici, nous effectuons l'implémentation la plus simple. La programmation réseau utilise BIO. Vous pouvez utiliser Netty en mode Reactor pour le réécrire de manière plus performante. La sérialisation et la désérialisation utilisées dans la transmission réseau sont également natives de Java. Bien entendu, ces octets de transmission sont relativement volumineux et peuvent être traités à l'aide du protoBuffer ou du kryo de Google. C'est juste par commodité pour expliquer le principe.

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.guanjian</groupId>
    <artifactId>rpc-framework</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
Copier après la connexion

La première chose est bien sûr l'interface que l'on souhaite appeler à distance et la méthode d'interface.

public interface HelloService {
    String sayHello(String content);}
Copier après la connexion

Classe d'implémentation d'interface

public class HelloServiceImpl implements HelloService {    public String sayHello(String content) {        return "hello," + content;    }
}
Copier après la connexion

Proxy dynamique côté consommateur, Si vous écrivez le fournisseur et le consommateur dans deux projets, le côté fournisseur a besoin de l'interface et des classes d'implémentation ci-dessus, tandis que le côté consommateur n’a besoin que de l’interface ci-dessus.

public class ConsumerProxy {
    /**
     * 消费者端的动态代理
     * @param interfaceClass 代理的接口类
     * @param host 远程主机IP
     * @param port 远程主机端口
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class[]{interfaceClass}, (proxy,method,args) -> {
                    //创建一个客户端套接字
                    Socket socket = new Socket(host, port);
                    try {
                        //创建一个对外传输的对象流,绑定套接字
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            //将动态代理的方法名写入对外传输的对象流中
                            output.writeUTF(method.getName());
                            //将动态代理的方法的参数写入对外传输的对象流中
                            output.writeObject(args);
                            //创建一个对内传输的对象流,绑定套接字
                            //这里是为了获取提供者端传回的结果
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                //从对内传输的对象流中获取结果
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
        );
    }
}
Copier après la connexion

Pour plus d'informations sur le proxy dynamique JDK, veuillez vous référer aux principes AOP et à l'auto-implémentation. Pour BIO, veuillez vous référer à la comparaison entre IO traditionnel et NIO

Côté fournisseur. services de transmission réseau et d'appels à distance

public class ProviderReflect {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * RPC监听和远程方法调用
     * @param service RPC远程方法调用的接口实例
     * @param port 监听的端口
     * @throws Exception
     */
    public static void provider(final Object service,int port) throws Exception {
        //创建服务端的套接字,绑定端口port
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            //开始接收客户端的消息,并以此创建套接字
            final Socket socket = serverSocket.accept();
            //多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽
            executorService.execute(() -> {
                try {
                    //创建呢一个对内传输的对象流,并绑定套接字
                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                    try {
                        try {
                            //从对象流中读取接口方法的方法名
                            String methodName = input.readUTF();
                            //从对象流中读取接口方法的所有参数
                            Object[] args = (Object[]) input.readObject();
                            Class[] argsTypes = new Class[args.length];
                            for (int i = 0;i < args.length;i++) {
                                argsTypes[i] = args[i].getClass();

                            }
                            //创建一个对外传输的对象流,并绑定套接字
                            //这里是为了将反射执行结果传递回消费者端
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                Class<?>[] interfaces = service.getClass().getInterfaces();
                                Method method = null;
                                for (int i = 0;i < interfaces.length;i++) {
                                    method = interfaces[i].getDeclaredMethod(methodName,argsTypes);
                                    if (method != null) {
                                        break;
                                    }
                                }
                                Object result = method.invoke(service, args);
                                //将反射执行结果写入对外传输的对象流中
                                output.writeObject(result);
                            } catch (Throwable t) {
                                output.writeObject(t);
                            } finally {
                                output.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            input.close();
                        }
                    } finally {
                        socket.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
Copier après la connexion

Démarrer l'écoute du réseau et les appels à distance du côté du fournisseur

public class RPCProviderMain {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        ProviderReflect.provider(service,8083);
    }
}
Copier après la connexion

Démarrer les appels proxy dynamiques sur le consommateur

public class RPCConsumerMain {
    public static void main(String[] args) throws InterruptedException {
        HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);
        for (int i = 0;i < 1000;i++) {
            String hello = service.sayHello("你好_" + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}
Copier après la connexion

Exécuter les résultats

Bonjour, bonjour _0
Bonjour, bonjour_1
Bonjour, bonjour_2
Bonjour, bonjour_3
Bonjour, bonjour_4
Bonjour, bonjour_5

.....

Si vous souhaitez l'étendre dans un framework RPC hautes performances avec Netty+ProtoBuffer, vous pouvez vous référer aux méthodes d'écriture associées de Netty intégrant Protobuffer.

Tutoriel recommandé : "PHP"

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:oschina.net
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal