首頁 > web前端 > js教程 > 主體

什麼是RPC?聊聊node中怎麼實現 RPC 通信

青灯夜游
發布: 2022-11-03 20:39:20
轉載
1798 人瀏覽過

什麼是RPC?聊聊node中怎麼實現 RPC 通信

【相關教學推薦:nodejs影片教學

什麼是RPC?

RPC:Remote Procedure Call(遠端過程調用)是指遠端過程調用,也就是說兩台伺服器A,B,一個應用程式部署在A伺服器上,想要調用B伺服器上應用提供的函數/方法,由於不在一個記憶體空間,不能直接調用,需要透過網路來表達調用的語義和傳達調用的資料。

伺服器與伺服器之間的通訊

RPC vs HTTP

相同點

  • #都是兩台電腦之間的網路通訊。 ajax是瀏覽器與伺服器之間的通行,RPC是伺服器與伺服器之間的通行
  • 需要雙方約定一個資料格式

不同點

  • 尋址伺服器不同

ajax 是使用DNS作為定址服務取得網域所對應的ip位址,瀏覽器拿到ip位址之後發送請求取得資料。

RPC一般是在內網路裡面互相請求,所以它一般不用DNS做尋址服務。因為在內網,所以可以使用規定的id或一個虛擬vip,例如v5:8001,然後到尋址伺服器取得v5所對應的ip位址。

  • 應用程式層協定不同

ajax使用http協議,它是一個文字協議,我們互動資料的時候檔案格式要不是html,要嘛是json對象,使用json的時候就是key-value的形式。

RPC採用二進位協定。採用二進位傳輸,它傳輸的包是這樣子的[0001 0001 0111 0110 0010],裡面都是二進制,一般採用那幾位表示一個字段,比如前6位是一個字段,依次類推。

這樣就不需要http傳送json物件裡面的key,所以有更小的資料體積。

因為傳輸的是二進制,更適合於電腦來理解,文字協定更適合人類理解,所以電腦去解讀各個欄位的耗時是比文字協定少很多的。

RPC採用二進位有更小的資料體積,及更快的解讀速度。

  • TCP通訊方式
  • 單工通訊:只能客戶端向服務端發送訊息,或只能服務端給客戶端發送訊息

  • 半雙工通訊:在某個時間段內只能客戶端給服務端發送訊息,過了這個時間段服務端可以傳送訊息給客戶端。如果把時間分成很多時間片,在一個時間片內就屬於單工通訊

  • 全雙工通訊:客戶端與服務端能相互通訊

#選擇這三種通訊方式的哪一種主要考慮的因素是:實現難度和成本。全雙工通訊是要比半雙工通訊的成本要高的,在某些場景下還是可以考慮使用半雙工通訊。

ajax是一種半雙工通訊。 http是文字協議,但它底層是tcp協議,http文字在tcp這一層會經歷從二進位資料流到文字的轉換過程。

理解RPC只是在更深入地理解前端技術。

buffer編解碼二進位封包

建立buffer

buffer.from: 從已有的資料建立二進位

const buffer1 = Buffer.from('geekbang')
const buffer2 = Buffer.from([0, 1, 2, 3, 4])



登入後複製

buffer.alloc: 建立一個空的二進位

const buffer3 = Buffer.alloc(20)

登入後複製

#在buffer裡面寫東西

  • buffer.write(string, offset): 寫入字串
  • #buffer.writeInt8(value, offset): int8表示二進位8位元( 8位元表示一個位元組)所能表示的整數,offset開始寫入之前要跳過的位元組數。
  • buffer.writeInt16BE(value, offset): int16(兩個位元組數),表示16個二進位位元所能表示的整數,即32767。超過這個數程序會報錯。
const buffer = Buffer.from([1, 2, 3, 4]) // 

// 往第二个字节里面写入12
buffer.writeInt8(12, 1) // 
登入後複製

大端BE與小端LE:主要是對於2個以上位元組的資料排列方式不同(writeInt8因為只有一個位元組,所以沒有大端和小端),大端的話就是低位位址放高位,小端就是低位元位址放低位。如下:

const buffer = Buffer.from([1, 2, 3, 4])

buffer.writeInt16BE(512, 2) // 
buffer.writeInt16LE(512, 2) // 
登入後複製

RPC傳輸的二進位如何表示傳遞的欄位

PC傳輸的二進位是如何表示欄位的呢?現在有二進位套件[00, 00, 00, 00, 00, 00, 00],我們假定前三個位元組表示一個欄位值,後面兩個表示一個欄位的值,最後兩個也表示一個欄位的值。那寫法如下:

writeInt16BE(value, 0)
writeInt16BE(value, 2)
writeInt16BE(value, 4)
登入後複製

发现像这样写,不仅要知道写入的值,还要知道值的数据类型,这样就很麻烦。不如json格式那么方便。针对这种情况业界也有解决方案。npm有个库protocol-buffers,把我们写的参数转化为buffer

// test.proto 定义的协议文件
message Column {
  required float num  = 1;
  required string payload = 2;
}
// index.js
const fs = require('fs')
var protobuf = require('protocol-buffers')
var messages = protobuf(fs.readFileSync('test.proto'))

var buf = messages.Column.encode({
	num: 42,
	payload: 'hello world'
})
console.log(buf)
// 

var obj = messages.Column.decode(buf)
console.log(obj)
// { num: 42, payload: 'hello world' }
登入後複製

net建立RPC通道

半双工通信

服务端代码:

const net = require('net')

const LESSON_DATA = {
  136797: '01 | 课程介绍',
  136798: '02 | 内容综述',
  136799: '03 | Node.js是什么?',
  136800: '04 | Node.js可以用来做什么?',
  136801: '05 | 课程实战项目介绍',
  136803: '06 | 什么是技术预研?',
  136804: '07 | Node.js开发环境安装',
  136806: '08 | 第一个Node.js程序:石头剪刀布游戏',
  136807: '09 | 模块:CommonJS规范',
  136808: '10 | 模块:使用模块规范改造石头剪刀布游戏',
  136809: '11 | 模块:npm',
  141994: '12 | 模块:Node.js内置模块',
  143517: '13 | 异步:非阻塞I/O',
  143557: '14 | 异步:异步编程之callback',
  143564: '15 | 异步:事件循环',
  143644: '16 | 异步:异步编程之Promise',
  146470: '17 | 异步:异步编程之async/await',
  146569: '18 | HTTP:什么是HTTP服务器?',
  146582: '19 | HTTP:简单实现一个HTTP服务器'
}

const server = net.createServer(socket => {
  // 监听客户端发送的消息
  socket.on('data', buffer => {
    const lessonId = buffer.readInt32BE()
    setTimeout(() => {
      // 往客户端发送消息
      socket.write(LESSON_DATA[lessonId])
    }, 1000)
  })
})

server.listen(4000)
登入後複製

客户端代码:

const net = require('net')

const socket = new net.Socket({})

const LESSON_IDS = [
  '136797',
  '136798',
  '136799',
  '136800',
  '136801',
  '136803',
  '136804',
  '136806',
  '136807',
  '136808',
  '136809',
  '141994',
  '143517',
  '143557',
  '143564',
  '143644',
  '146470',
  '146569',
  '146582'
]

socket.connect({
  host: '127.0.0.1',
  port: 4000
})

let buffer = Buffer.alloc(4)
buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

// 往服务端发送消息
socket.write(buffer)

// 监听从服务端传回的消息
socket.on('data', buffer => {
  console.log(buffer.toString())

  // 获取到数据之后再次发送消息
  buffer = Buffer.alloc(4)
  buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

  socket.write(buffer)
})
登入後複製

以上半双工通信步骤如下:

  • 客户端发送消息 socket.write(buffer)
  • 服务端接受消息后往客户端发送消息 socket.write(buffer)
  • 客户端接受消息后再次发送消息

这样在一个时间端之内,只有一个端往另一个端发送消息,这样就实现了半双工通信。那如何实现全双工通信呢,也就是在客户端往服务端发送消息的同时,服务端还没有消息返回给客户端之前,客户端又发送了一个消息给服务端。

全双工通信

先来看一个场景:

什麼是RPC?聊聊node中怎麼實現 RPC 通信

客户端发送了一个id1的请求,但是服务端还来不及返回,接着客户端又发送了一个id2的请求。

等了一个之后,服务端先把id2的结果返回了,然后再把id1的结果返回。

那如何结果匹配到对应的请求上呢?

如果按照时间顺序,那么id1的请求对应了id2的结果,因为id2是先返回的;id2的请求对应了id1的结果,这样就导致请求包和返回包错位的情况。

怎么办呢?

我们可以给请求包和返回包都带上序号,这样就能对应上。

错位处理

客户端代码:

socket.on('data', buffer => {
  // 包序号
  const seqBuffer = buffer.slice(0, 2)
  // 服务端返回的内容
  const titleBuffer = buffer.slice(2)
    
  console.log(seqBuffer.readInt16BE(), titleBuffer.toString())
})

// 包序号
let seq = 0
function encode(index) {
  // 请求包的长度现在是6 = 2(包序号) + 4(课程id)
  buffer = Buffer.alloc(6)
  buffer.writeInt16BE(seq)
  buffer.writeInt32BE(LESSON_IDS[index], 2)

  seq++
  return buffer
}

// 每50ms发送一次请求
setInterval(() => {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}, 50)
登入後複製

服务端代码:

const server = net.createServer(socket => {
  socket.on('data', buffer => {
    // 把包序号取出
    const seqBuffer = buffer.slice(0, 2)
    // 从第2个字节开始读取
    const lessonId = buffer.readInt32BE(2)
    setTimeout(() => {
      const buffer = Buffer.concat([
        seqBuffer,
        Buffer.from(LESSON_DATA[lessonId])
      ])
      socket.write(buffer)
      // 这里返回时间采用随机的,这样就不会按顺序返回,就可以测试错位的情况
    }, 10 + Math.random() * 1000)
  })
})
登入後複製
  • 客户端把包序号和对应的id给服务端
  • 服务端取出包序号和对应的id,然后把包序号和id对应的内容返回给客户端,同时设置返回的时间是随机的,这样就不会按照顺序返回。

粘包处理

如果我们这样发送请求:

for (let i = 0; i < 100; i++) {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}
登入後複製

我们发现服务端接收到的信息如下:

登入後複製

这是因为TCP自己做的一个优化,它会把所有的请求包拼接在一起,这样就会产生粘包的现象。

服务端需要把包进行拆分,拆分成100个小包。

那如何拆分呢?

首先客户端发送的数据包包括两部分:定长的包头和不定长的包体

包头又分为两部分:包序号及包体的长度。只有知道包体的长度,才能知道从哪里进行分割。

let seq = 0
function encode(data) {
    // 正常情况下,这里应该是使用 protocol-buffers 来encode一段代表业务数据的数据包
    // 为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送
    const body = Buffer.alloc(4);
    body.writeInt32BE(LESSON_IDS[data.id]);

    // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
    // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
    const header = Buffer.alloc(6); // 包序号占2个字节,包体长度占4个字节,共6个字节
    header.writeInt16BE(seq)
    header.writeInt32BE(body.length, 2);

    // 包头和包体拼起来发送
    const buffer = Buffer.concat([header, body])

    console.log(`包${seq}传输的课程id为${LESSON_IDS[data.id]}`);
    seq++;
    return buffer;
}

// 并发
for (let i = 0; i < 100; i++) {
    id = Math.floor(Math.random() * LESSON_IDS.length)
    socket.write(encode({ id }))
}
登入後複製

服务端进行拆包

const server = net.createServer(socket => {
  let oldBuffer = null
  socket.on('data', buffer => {
    // 把上一次data事件使用残余的buffer接上来
    if (oldBuffer) {
      buffer = Buffer.concat([oldBuffer, buffer])
    }
    let packageLength = 0
    // 只要还存在可以解成完整包的包长
    while ((packageLength = checkComplete(buffer))) {
      // 确定包的长度后进行slice分割
      const package = buffer.slice(0, packageLength)
      // 剩余的包利用循环继续分割
      buffer = buffer.slice(packageLength)

      // 把这个包解成数据和seq
      const result = decode(package)

      // 计算得到要返回的结果,并write返回
      socket.write(encode(LESSON_DATA[result.data], result.seq))
    }

    // 把残余的buffer记下来
    oldBuffer = buffer
  })
})
登入後複製

checkComplete 函数的作用来确定一个数据包的长度,然后进行分割:

function checkComplete(buffer) {
  // 如果包的长度小于6个字节说明只有包头,没有包体,那么直接返回0
  if (buffer.length <= 6) {
    return 0
  }
  // 读取包头的第二个字节,取出包体的长度
  const bodyLength = buffer.readInt32BE(2)
  // 请求包包括包头(6个字节)和包体body
  return 6 + bodyLength
}
登入後複製

decode对包进行解密:

function decode(buffer) {
  // 读取包头
  const header = buffer.slice(0, 6)
  const seq = header.readInt16BE()
    
  // 读取包体  
  // 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包
  // 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可
  const body = buffer.slice(6).readInt32BE()

  // 这里把seq和数据返回出去
  return {
    seq,
    data: body
  }
}
登入後複製

encode把客户端想要的数据转化为二进制返回,这个包同样包括包头和包体,包头又包括包需要包序号和包体的长度。

function encode(data, seq) {
  // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包
  // 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回
  const body = Buffer.from(data)

  // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
  // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
  const header = Buffer.alloc(6)
  header.writeInt16BE(seq)
  header.writeInt32BE(body.length, 2)

  const buffer = Buffer.concat([header, body])

  return buffer
}
登入後複製

当客户端收到服务端发送的包之后,同样也要进行拆包,因为所有的包同样都粘在一起了:

 
登入後複製

因此,客户端也需要拆包,拆包策略与服务端的拆包策略是一致的:

let oldBuffer = null
socket.on('data', buffer => {
  // 把上一次data事件使用残余的buffer接上来
  if (oldBuffer) {
    buffer = Buffer.concat([oldBuffer, buffer])
  }
  let completeLength = 0

  // 只要还存在可以解成完整包的包长
  while ((completeLength = checkComplete(buffer))) {
    const package = buffer.slice(0, completeLength)
    buffer = buffer.slice(completeLength)

    // 把这个包解成数据和seq
    const result = decode(package)
    console.log(`包${result.seq},返回值是${result.data}`)
  }

  // 把残余的buffer记下来
  oldBuffer = buffer
})
登入後複製

到这里就实现了双全工通行,这样客户端和服务端随时都可以往对方发小消息了。

更多node相关知识,请访问:nodejs 教程

以上是什麼是RPC?聊聊node中怎麼實現 RPC 通信的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:juejin.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!