• 技术文章 >Java >java教程

    RPC框架的实例详解

    零下一度零下一度2017-07-17 14:07:56原创1301
    1,背景

    随着互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进

    单一应用架构

    当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本

    此时,用于简化增删改查工作量的 数据访问框架(ORM) 是关键

    垂直应用架构

    当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率

    此时,用于加速前端页面开发的 Web框架(MVC) 是关键

    分布式服务架构

    按业务线拆分

    停止RPC滥用,垂直业务内优先通过本地jar调用,跨业务才采用RPC调用

    正确的识别业务逻辑的归属,让各个模块最大化内聚,从性能,可用性和维护性上减少耦合

    每次发布只部署部分服务器

    每个节点可根据不同需求伸缩扩展

    每个应用之间更新,部署,运行不影响

    部署分离


    团队分离

    数据分离

    当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求

    此时,用于提高业务复用及整合的 分布式服务框架(RPC) 是关键

    分布式服务RPC框架


    流动计算架构


    当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率

    此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键

    Netty 线程模型

    Netty的线程模型主要是基于React,因为考虑到应用场景的不同所以演化出多种版本。

    单线程模式

    即接收服务请求以及执行IO操作都由一个线程来完成,由于采用的是IO多路复用这类无阻塞IO操作,所以在请求量不大的情况下单线程模式也是可以解决一部分场景问题的。

    单接收多工作线程模式

    当请求量增大后,原有的一个线程处理所有IO操作变得越来越无法支撑相应的性能指标,所以提到了一个工作线程池的概念,此时接收服务请求还是一个线程,接收请求的线程收到请求后会委托给后面的工作线程池,从线程池中取得一个线程去执行用户请求。

    多接收多工作线程模式

    当请求量进一步增大后,单一的接收服务请求的线程无法处理所有客户端的连接,所以将接收服务请求的也扩展成线程池,由多个线程同时负责接收客户端的连接。

    RPC 业务线程

    上面提到的都是Netty自身的线程模型,伴随着请求量的增长而不断发展出来的优化策略。而RPC请求对应用系统来讲最主要还是业务逻辑的处理,而这类业务有可能是计算密集型的也有可以是IO密集型,像大多数应用都伴随着数据库操作,redis或者是连接其它的网络服务等。如果业务请求中有这类耗时的IO操作,推荐将处理业务请求的任务分配给独立的线程池,否则可能会阻塞netty自身的线程。

    接收请求线程与工作线程分工

    方案实现

    目前我实现的RPC是采用多接收多工作线程模式,在服务端是这样绑定端口的:

    public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(this.rpcServerInitializer)
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();
    
    
                } catch (InterruptedException e) {throw new RpcException(e);
                }
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }

    boosGroup就是一组用来接收服务请求的
    workerGroup就是一组具体负责IO操作的

    增加业务线程只需要将handle的操作进一步委派给线程池即可,这里为了扩展所以需要定义接口:

    定义线程池接口

    public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
    }

    实现固定大小线程池

    参考了dubbo线程池

    @Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
                        executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                                queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                   //...}
                                });
                    }
                }
            }return executor;
        }
    }

    小插曲:
    记的有一次一朋友突然问java 线程池中的那个coreSize是什么意思?我顿时短路了,因平时也不怎么写多线程,想到平时用的比较多的数据库线程池,里面的参数倒是印象比较深,但就是想不起来有个coreSize。后来才又仔细看了下线程池的一些参数。现在借这个机会又可以多多再看看,以免再次短路。

    线程池工厂

    当有多个线程池实现时,通过线程池名称来动态选择线程池。

    @Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
        }
    }

    修改ChannelHandle的channelRead0方法

    将方法体包装成Task交给线程池去执行。

    @Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
                channelHandlerContext.writeAndFlush(response);
            }
        });
    
    }

    问题

    目前缺乏压测,所以暂时没有明确的数据对比。

    以上就是RPC框架的实例详解的详细内容,更多请关注php中文网其它相关文章!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    专题推荐:线程 业务 框架
    上一篇:分享用ajax实现带百分比进度条的实例 下一篇:java中Comparator和 Comparable的介绍及区别实例
    VIP课程(WEB全栈开发)

    相关文章推荐

    • 【腾讯云】年中优惠,「专享618元」优惠券!• 实例详解Java基础的控制语句• 一起来聊聊与Java中性能相关的设计模式• Java集合框架之PriorityQueue优先级队列• JAVA外观模式详解• 详细整理java枚举的使用总结
    1/1

    PHP中文网