Home > Database > Redis > How to implement Redis protocol parser based on Golang

How to implement Redis protocol parser based on Golang

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Release: 2023-05-28 19:13:21
forward
799 people have browsed it

RESP protocol

RESP is a protocol for communication between the client and the server. There are five formats:

Normal reply: starting with " " and ending with "\ r\n" in the form of a string

Error reply: starting with "-" and ending in the form of a string with "\r\n"

Integer: starting with ":", String format ending with "\r\n"

Multi-line string: starting with "$", followed by the actual number of bytes sent, and then starting and ending with "\r\n"

$3\r\nabc\r\n

Array: starts with "*", followed by the number of members

SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\ nvalue\r\n

Commands or data sent by the client and server always use \r\n (CRLF) as the newline character.

When we enter *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n $5\r\nvalue\r\nFor such a series of commands, the server receives the following commands:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n

interface/resp/conn.go

type Connection interface {
   Write([]byte) error
   GetDBIndex() int
   SelectDB(int)
}

interface/resp/reply.go
type Reply interface {
	ToBytes() []byte
}
Copy after login
  • Connection interface: a connection for the Redis client

  • Write: reply to the client

  • GetDBIndex: Redis has 16 DBs

  • Reply interface: response interface

resp/reply/consts.go

type PongReply struct{}

var pongBytes = []byte("+PONG\r\n")

func (r *PongReply) ToBytes() []byte {
    return pongBytes
}

var thePongReply = new(PongReply)

func MakePongReply() *PongReply {
    return thePongReply
}

type OkReply struct{}

var okBytes = []byte("+OK\r\n")

func (r *OkReply) ToBytes() []byte {
    return okBytes
}

var theOkReply = new(OkReply)

func MakeOkReply() *OkReply {
    return theOkReply
}

var nullBulkBytes = []byte("$-1\r\n")

type NullBulkReply struct{}

func (r *NullBulkReply) ToBytes() []byte {
    return nullBulkBytes
}

func MakeNullBulkReply() *NullBulkReply {
    return &NullBulkReply{}
}

var emptyMultiBulkBytes = []byte("*0\r\n")

type EmptyMultiBulkReply struct{}

func (r *EmptyMultiBulkReply) ToBytes() []byte {
    return emptyMultiBulkBytes
}

type NoReply struct{}

var noBytes = []byte("")

func (r *NoReply) ToBytes() []byte {
    return noBytes
}
Copy after login

Define five kinds of replies: reply pong, ok, null, empty array, empty

resp/reply/reply.go

type ErrorReply interface {
   Error() string
   ToBytes() []byte
}
Copy after login

ErrorReply: Define error interface

resp/reply/errors.go

type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

func (r *UnknownErrReply) ToBytes() []byte {
   return unknownErrBytes
}

func (r *UnknownErrReply) Error() string {
   return "Err unknown"
}

type ArgNumErrReply struct {
   Cmd string
}

func (r *ArgNumErrReply) ToBytes() []byte {
   return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}

func (r *ArgNumErrReply) Error() string {
   return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
   return &ArgNumErrReply{
      Cmd: cmd,
   }
}

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

func MakeSyntaxErrReply() *SyntaxErrReply {
   return theSyntaxErrReply
}

func (r *SyntaxErrReply) ToBytes() []byte {
   return syntaxErrBytes
}

func (r *SyntaxErrReply) Error() string {
   return "Err syntax error"
}

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

func (r *WrongTypeErrReply) ToBytes() []byte {
   return wrongTypeErrBytes
}

func (r *WrongTypeErrReply) Error() string {
   return "WRONGTYPE Operation against a key holding the wrong kind of value"
}

type ProtocolErrReply struct {
   Msg string
}

func (r *ProtocolErrReply) ToBytes() []byte {
   return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}

func (r *ProtocolErrReply) Error() string {
   return "ERR Protocol error: '" + r.Msg
}
Copy after login

errors defines 5 types of errors: UnknownErrReply Unknown error , ArgNumErrReply parameter number error, SyntaxErrReply syntax error, WrongTypeErrReply data type error, ProtocolErrReply protocol error

resp/reply/reply.go

var (
   nullBulkReplyBytes = []byte("$-1")
   // 协议的结尾
   CRLF = "\r\n"
)

type BulkReply struct {
   Arg []byte
}

func MakeBulkReply(arg []byte) *BulkReply {
   return &BulkReply{
      Arg: arg,
   }
}

func (r *BulkReply) ToBytes() []byte {
   if len(r.Arg) == 0 {
      return nullBulkReplyBytes
   }
   return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}

type MultiBulkReply struct {
   Args [][]byte
}

func (r *MultiBulkReply) ToBytes() []byte {
   argLen := len(r.Args)
   var buf bytes.Buffer
   buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
   for _, arg := range r.Args {
      if arg == nil {
         buf.WriteString("$-1" + CRLF)
      } else {
         buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
      }
   }
   return buf.Bytes()
}

func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
   return &MultiBulkReply{
      Args: args,
   }
}

type StatusReply struct {
   Status string
}

func MakeStatusReply(status string) *StatusReply {
   return &StatusReply{
      Status: status,
   }
}

func (r *StatusReply) ToBytes() []byte {
   return []byte("+" + r.Status + CRLF)
}

type IntReply struct {
   Code int64
}

func MakeIntReply(code int64) *IntReply {
   return &IntReply{
      Code: code,
   }
}

func (r *IntReply) ToBytes() []byte {
   return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}

type StandardErrReply struct {
   Status string
}

func (r *StandardErrReply) ToBytes() []byte {
   return []byte("-" + r.Status + CRLF)
}

func (r *StandardErrReply) Error() string {
   return r.Status
}

func MakeErrReply(status string) *StandardErrReply {
   return &StandardErrReply{
      Status: status,
   }
}

func IsErrorReply(reply resp.Reply) bool {
   return reply.ToBytes()[0] == '-'
}
Copy after login
  • BulkReply: Reply to a string

  • MultiBulkReply: Reply to an array of strings

  • StatusReply: Status reply

  • IntReply: Numeric reply

  • StandardErrReply: Standard error reply

  • IsErrorReply: Determine whether it is an error reply

  • ToBytes: Convert the string into the format specified by the RESP protocol

resp/parser/parser.go

type Payload struct {
   Data resp.Reply
   Err  error
}

type readState struct {
   readingMultiLine  bool     
   expectedArgsCount int     
   msgType           byte    
   args              [][]byte 
   bulkLen           int64    
}

func (s *readState) finished() bool {
   return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}

func ParseStream(reader io.Reader) <-chan *Payload {
   ch := make(chan *Payload)
   go parse0(reader, ch)
   return ch
}

func parse0(reader io.Reader, ch chan<- *Payload) {
	 ......
}
Copy after login

Payload Structure: The data sent to us by the customer service side

Reply: The data sent by the client and the server to each other is called Reply

readState structure:

  • readingMultiLine: parsing single or multiple lines of data

  • expectedArgsCount: the number of parameters that should be read

  • msgType: message type

  • args: Message content

  • bulkLen: Data length

finished method: Determine whether the parsing is completed

ParseStream method: Asynchronously parse the data and put it into the pipeline, and return the pipeline data

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
   var msg []byte
   var err error
   if state.bulkLen == 0 {
      msg, err = bufReader.ReadBytes(&#39;\n&#39;)
      if err != nil {
         return nil, true, err
      }
      if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; {
         return nil, false, errors.New("protocol error: " + string(msg))
      }
   } else {
      msg = make([]byte, state.bulkLen+2)
      _, err = io.ReadFull(bufReader, msg)
      if err != nil {
         return nil, true, err
      }
      if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; || msg[len(msg)-1] != &#39;\n&#39; {
         return nil, false, errors.New("protocol error: " + string(msg))
      }
      state.bulkLen = 0
   }
   return msg, false, nil
}
Copy after login

readLine: Read line by line. Read normal lines, separated by \n. When reading lines containing \r\n characters in the text, state.bulkLen adds the newline character \r\n (state.bulkLen 2)

func parseMultiBulkHeader(msg []byte, state *readState) error {
   var err error
   var expectedLine uint64
   expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
   if err != nil {
      return errors.New("protocol error: " + string(msg))
   }
   if expectedLine == 0 {
      state.expectedArgsCount = 0
      return nil
   } else if expectedLine > 0 {
      state.msgType = msg[0]
      state.readingMultiLine = true
      state.expectedArgsCount = int(expectedLine)
      state.args = make([][]byte, 0, expectedLine)
      return nil
   } else {
      return errors.New("protocol error: " + string(msg))
   }
}

func parseBulkHeader(msg []byte, state *readState) error {
   var err error
   state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
   if err != nil {
      return errors.New("protocol error: " + string(msg))
   }
   if state.bulkLen == -1 { // null bulk
      return nil
   } else if state.bulkLen > 0 {
      state.msgType = msg[0]
      state.readingMultiLine = true
      state.expectedArgsCount = 1
      state.args = make([][]byte, 0, 1)
      return nil
   } else {
      return errors.New("protocol error: " + string(msg))
   }
}
Copy after login

parseMultiBulkHeader: Parse the header of the array and set the expected number of lines and related parameters.

parseBulkHeader: Parse the header of a multi-line string.

func parseSingleLineReply(msg []byte) (resp.Reply, error) {
   str := strings.TrimSuffix(string(msg), "\r\n")
   var result resp.Reply
   switch msg[0] {
   case &#39;+&#39;: // status reply
      result = reply.MakeStatusReply(str[1:])
   case &#39;-&#39;: // err reply
      result = reply.MakeErrReply(str[1:])
   case &#39;:&#39;: // int reply
      val, err := strconv.ParseInt(str[1:], 10, 64)
      if err != nil {
         return nil, errors.New("protocol error: " + string(msg))
      }
      result = reply.MakeIntReply(val)
   }
   return result, nil
}

func readBody(msg []byte, state *readState) error {
   line := msg[0 : len(msg)-2]
   var err error
   if line[0] == &#39;$&#39; {
      // bulk reply
      state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
      if err != nil {
         return errors.New("protocol error: " + string(msg))
      }
      if state.bulkLen <= 0 { // null bulk in multi bulks
         state.args = append(state.args, []byte{})
         state.bulkLen = 0
      }
   } else {
      state.args = append(state.args, line)
   }
   return nil
}
Copy after login

parseSingleLineReply: Parse a single-line command

readBody: Read a multi-line command. If it starts with $, set bulkLen. When reading the next line, use this 2. If it does not start with $, add it directly. Go to args

func parse0(reader io.Reader, ch chan<- *Payload) {
    defer func() {
       if err := recover(); err != nil {
          logger.Error(string(debug.Stack()))
      }
   }()
    bufReader := bufio.NewReader(reader)
    var state readState
    var err error
    var msg []byte
    for {
       var ioErr bool
       msg, ioErr, err = readLine(bufReader, &state)
       if err != nil {
          if ioErr {
             ch <- &Payload{
                Err: err,
            }
             close(ch)
             return
         }
          ch <- &Payload{
             Err: err,
         }
          state = readState{}
          continue
      }

       if !state.readingMultiLine {
          if msg[0] == &#39;*&#39; {
             // multi bulk reply
             err = parseMultiBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
               }
                state = readState{}
                continue
            }
             if state.expectedArgsCount == 0 {
                ch <- &Payload{
                   Data: &reply.EmptyMultiBulkReply{},
               }
                state = readState{}
                continue
            }
         } else if msg[0] == &#39;$&#39; { // bulk reply
             err = parseBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
               }
                state = readState{} // reset state
                continue
            }
             if state.bulkLen == -1 { // null bulk reply
                ch <- &Payload{
                   Data: &reply.NullBulkReply{},
               }
                state = readState{} // reset state
                continue
            }
         } else {
             // single line reply
             result, err := parseSingleLineReply(msg)
             ch <- &Payload{
                Data: result,
                Err:  err,
            }
             state = readState{} // reset state
             continue
         }
      } else {
          // read bulk reply
          err = readBody(msg, &state)
          if err != nil {
             ch <- &Payload{
                Err: errors.New("protocol error: " + string(msg)),
            }
             state = readState{} // reset state
             continue
         }
          // if sending finished
          if state.finished() {
             var result resp.Reply
             if state.msgType == &#39;*&#39; {
                result = reply.MakeMultiBulkReply(state.args)
            } else if state.msgType == &#39;$&#39; {
                result = reply.MakeBulkReply(state.args[0])
            }
             ch <- &Payload{
                Data: result,
                Err:  err,
            }
             state = readState{}
         }
      }
   }
}
Copy after login

parse0: Parse the command and send it through the channel after the parsing is completed

resp/connection/conn.go

type Connection struct {
    conn net.Conn
    waitingReply wait.Wait
    mu sync.Mutex // 避免多个协程往客户端中写
    selectedDB int
}

func NewConn(conn net.Conn) *Connection {
    return &Connection{
        conn: conn,
    }
}

func (c *Connection) RemoteAddr() net.Addr {
    return c.conn.RemoteAddr()
}

func (c *Connection) Close() error {
    c.waitingReply.WaitWithTimeout(10 * time.Second)
    _ = c.conn.Close()
    return nil
}

func (c *Connection) Write(b []byte) error {
    if len(b) == 0 {
        return nil
    }
    c.mu.Lock()
    c.waitingReply.Add(1)
    defer func() {
        c.waitingReply.Done()
        c.mu.Unlock()
    }()

    _, err := c.conn.Write(b)
    return err
}

func (c *Connection) GetDBIndex() int {
    return c.selectedDB
}

func (c *Connection) SelectDB(dbNum int) {
    c.selectedDB = dbNum
}
Copy after login

We wrote before The EchoHandler is used to receive user input and return it unchanged. Now you need to write a RespHandler to replace the EchoHandler and let the parser perform the parsing. A Connection structure that manages client connections needs to exist in RespHandler.

Connection: Client connection, used in the handler of the protocol layer

resp/handler/handler.go

var (
   unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)

type RespHandler struct {
   activeConn sync.Map
   db         databaseface.Database
   closing    atomic.Boolean
}

func MakeHandler() *RespHandler {
   var db databaseface.Database
   db = database.NewEchoDatabase()
   return &RespHandler{
      db: db,
   }
}

func (h *RespHandler) closeClient(client *connection.Connection) {
   _ = client.Close()
   h.db.AfterClientClose(client)
   h.activeConn.Delete(client)
}

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
   if h.closing.Get() {
      // closing handler refuse new connection
      _ = conn.Close()
   }

   client := connection.NewConn(conn)
   h.activeConn.Store(client, 1)

   ch := parser.ParseStream(conn)
   for payload := range ch {
      if payload.Err != nil {
         if payload.Err == io.EOF ||
            payload.Err == io.ErrUnexpectedEOF ||
            strings.Contains(payload.Err.Error(), "use of closed network connection") {
            // connection closed
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         // protocol err
         errReply := reply.MakeErrReply(payload.Err.Error())
         err := client.Write(errReply.ToBytes())
         if err != nil {
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         continue
      }
      if payload.Data == nil {
         logger.Error("empty payload")
         continue
      }
      r, ok := payload.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
         continue
      }
      result := h.db.Exec(client, r.Args)
      if result != nil {
         _ = client.Write(result.ToBytes())
      } else {
         _ = client.Write(unknownErrReplyBytes)
      }
   }
}

func (h *RespHandler) Close() error {
   logger.Info("handler shutting down...")
   h.closing.Set(true)
   // TODO: concurrent wait
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*connection.Connection)
      _ = client.Close()
      return true
   })
   h.db.Close()
   return nil
}
Copy after login

RespHandler: and The previous echo is similar, with the addition of the core layer's db.exec execution parsing instructions

interface/database/database.go

type CmdLine = [][]byte

type Database interface {
	Exec(client resp.Connection, args [][]byte) resp.Reply
	AfterClientClose(c resp.Connection)
	Close()
}

type DataEntity struct {
	Data interface{}
}
Copy after login

Exec: core layer execution

AfterClientClose: Aftercare method after closing

CmdLine: Command alias for two-dimensional byte array

DataEntity: Represents Redis data, including string, list, set, etc.

database/echo_database.go

type EchoDatabase struct {
}

func NewEchoDatabase() *EchoDatabase {
   return &EchoDatabase{}
}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
   return reply.MakeMultiBulkReply(args)
}

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
   logger.Info("EchoDatabase AfterClientClose")
}

func (e EchoDatabase) Close() {
   logger.Info("EchoDatabase Close")
}
Copy after login

echo_database: Test protocol layer

Exec: After the instruction is parsed, use MakeMultiBulkReply to wrap it and return it

main.go

err := tcp.ListenAndServeWithSignal(
   &tcp.Config{
      Address: fmt.Sprintf("%s:%d",
         config.Properties.Bind,
         config.Properties.Port),
   },
   handler.MakeHandler())
if err != nil {
   logger.Error(err)
}
Copy after login

Change main to what you just wrote: handler.MakeHandler()

The above is the detailed content of How to implement Redis protocol parser based on Golang. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template