实现一个 Java 版的 Redis

高洛峰
Freigeben: 2018-05-30 13:56:00
Original
3427 Leute haben es durchsucht

最近看了 Redis 的代码,感觉还是挺简单的.有冲动想用其它语言实现(抄)一个.原来想用 Python 实现来着.后来想想试试 Netty.原因有二

第一:Java 的NIO 和Netty 的 EventLoop 配合起来和 Redis 的网络模型很接近.都是 Ractor 模型.甚至 Redis的模型更简单--只有一个 EventLoop 线程.写(抄)起来更方便

第二:Netty 架构挺不错.借这个机会学习一下.

如果我们从一个很抽象(简单)的角度看 Redis Server.就是一个监听在6379的程序, 本质上是一个处理单线线请求的 Hashtable. 而 Redis 的协议也是非常非常的简单.比 http 协议可简单多了.

以下是这个协议的一般形式:

*<参数数量> CR LF $<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF
Nach dem Login kopieren

这基本就是一个很简单的有限状态机.

1.png

所以我给我们的命令解析器设置3个状态.

public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
Nach dem Login kopieren

我们将初始状态设置NUMBER_OF_ARGS 也就是开始那个绿色的状态.当有数据到达时.我们不停的判断程序的状态.是哪个状态,我们做啥.

while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS break; } }
Nach dem Login kopieren

下面我们按着我们上面思路实现一下.

package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder;import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF) { throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
Nach dem Login kopieren

写到这里有一个小问题,如果你上面代码看懂了,你就会发现一个小问题.如果由于网络原因,有时数据可以并没有接收完全.而我们的代码完全没有做这方面的考虑? 而 Checkpoint 这是又什么鬼?

第一个问题:

事实上我们有考虑这个问题.所以我们继承了一个相对比较特别Decoder--ReplayingDecoder.我们看一下ReplayingDecoder的 CallDecode 方法.(这个名字起的非常的直白.你一定明白他是干啥的)

 try { decode(ctx, replayable, out); //省略} catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
Nach dem Login kopieren

Signal replay 是 Netty 中定义的一个错误.当我们读取错误时,Netty 会再等到下次有数据到达时,再试一次Decode 方法.看看能再解析成功.所以我们就可以假设置我们要的数据都已经读取了.

但是要注意: replaydecoder 的 decode 方法会被反复调用..所以我们的代码中要做好这样的准备.

二: CheckPoint 就是为了防止如果每次反复调用 Decode 时从头执行,而设置的一个状态.让我们这个 decode 方法有状态.

好了.现在我们创建监部分的代码.这都是套数,直接抄下来就行了

 ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }
Nach dem Login kopieren

我们把 Redis 的协议解析为RedisFrame 类

 package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }
Nach dem Login kopieren

好了.这时你打开 Redis-cli 试试是不是可以连上我们的 "假Redis" Server.有意的是---你打开 Redis-cli.他会自动发一个 "Command" 命令.而你不管回复什么,它都认为连上了

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!