最近看了Redis 的程式碼,感覺還是挺簡單的.有衝動想用其它語言實現(抄)一個.原來想用Python 實現來著.後來想想試試Netty.原因有二
第一:Java的NIO 和Netty 的EventLoop 配合起來和Redis 的網路模型很接近.都是Ractor 模型.甚至 Redis的模型更簡單--只有一個EventLoop 線程.寫(抄)起來更方便
第二:Netty 架構挺不錯.藉這個機會學習一下.
如果我們從一個很抽象(簡單)的角度看Redis Server.就是一個監聽在6379的程序, 本質上是一個處理單線線請求的Hashtable. 而Redis 的協議也是非常非常的簡單.比http 協定可簡單多了.
以下是這個協定的一般形式:
*<参数数量> CR LF $<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF
這基本上就是一個很簡單的有限狀態機.
所以我給我們的命令解析器設定333個狀態.
public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
我們將初始狀態設定NUMBER_OF_ARGS 也就是開始那個綠色的狀態.當有資料到達時.我們不停的判斷程式的狀態.是哪個狀態,我們做啥.
while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS break; } }
下面我們按著我們上面思路實現一下.
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder;import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF) { throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
寫到這裡有一個小問題,如果你上面代碼看懂了,你就會發現一個小問題.如果由於網絡原因,有時數據可以並沒有接收完全.而我們的代碼完全沒有做這方面的考慮? 而Checkpoint 這是又什麼鬼?
第一個問題:
事實上我們有考慮這個問題.所以我們繼承了一個相對比較特別Decoder--ReplayingDecoder.我們看一下ReplayingDecoder的CallDecode 方法.(這個名字起的非常的直白.你一定明白他是乾啥的)
</p><pre class="brush:java;toolbar:false"> try { decode(ctx, replayable, out); //省略} catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
Signal replay 是Netty 中定義的一個錯誤.當我們讀取錯誤時,Netty 會再等到下次有資料到達時,再試一次Decode 方法.看看能再解析成功.所以我們就可以假設定我們要的資料都已經讀取了.
但是要注意: replaydecoder 的decode 方法會被反覆調用..所以我們的程式碼中要做好這樣的準備.
二: CheckPoint 就是為了防止如果每次反复調用Decode 時從頭執行,而設置的一個狀態.讓我們這個decode 方法有狀態.
好了.現在我們建立監部分的程式碼.這都是套數,直接抄下來就行了
</p><pre class="brush:java;toolbar:false"> ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }
我們把Redis 的協定解析為RedisFrame 類別
</p><pre class="brush:java;toolbar:false"> package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }
好了.這時你打開Redis-cli 試試看是不是可以連上我們的"假Redis" Server.有意的是---你打開Redis-cli.他會自動發一個"Command" 命令.而你不管回復什麼,它都認為連上了