首頁 > web前端 > js教程 > 主體

深入了解Node.js中的四大流,解決「背壓」問題

發布: 2021-09-13 18:28:30
轉載
2729 人瀏覽過

Node.js的4種stream是什麼?什麼是背壓問題?以下這篇文章就來帶大家了解一下Node.js中的四大流(stream),介紹一下背壓問題和解決方法,希望對大家有幫助!

深入了解Node.js中的四大流,解決「背壓」問題

把一個東西從 A 搬到 B 該怎麼搬呢?

抬起來,移動到目的地,放下不就行了麼。

那如果這個東西有一噸重呢?

那就一部分的搬家。

其實IO 也就是搬東西,包括網路的IO、檔案的IO,如果資料量少,那麼直接傳送全部內容就行了,但如果內容特別多,一次性載入到記憶體會崩潰,而且速度也慢,這時候就可以一部分一部分的處理,這就是流的想法。

各種語言基本上都實作了 stream 的 api,Node.js 也是,stream api 是比較常用的,下面我們就來探究一下 stream。

本文會回答以下問題:

  • Node.js 的4 種stream 是什麼

  • 產生器如何與Readable Stream 結合

  • stream 的暫停與流動

  • 什麼是背壓問題,如何解決

【推薦學習:《nodejs 教學》】

Node.js 的4種stream

流的直覺感受

從一個地方流到另一個地方,顯然有流出的一方和流入的一方,流出的一方就是可讀流(readable),而流入的一方就是可寫流(writable)。

深入了解Node.js中的四大流,解決「背壓」問題

當然,也有的流既可以流入又可以流出,這種叫做雙工流(duplex)

深入了解Node.js中的四大流,解決「背壓」問題

既然可以流入又可以流出,那麼是不是可以對流入的內容做下轉換再流出呢,這種流叫做轉換流(transform)

深入了解Node.js中的四大流,解決「背壓」問題

duplex 流的流入和流出內容不需要相關,而transform 流的流入和流出是相關的,這是兩者的差異。

流的api

Node.js 提供的stream 就是上面介紹的4 個:

const stream = require('stream');

// 可读流
const Readable = stream.Readable;
// 可写流
const Writable = stream.Writable;
// 双工流
const Duplex = stream.Duplex;
// 转换流
const Transform = stream.Transform;
登入後複製

它們都有要實作的方法:

  • Readable 需要實作_read 方法來傳回內容

  • Writable 需要實作_write 方法來接受內容

  • Duplex 需要實作_read 和_write 方法來接受和傳回內容

  • Transform 需要實作_transform 方法來把接受的內容轉換之後返回

我們分別來看看:

Readable

Readable 要實作_read 方法,透過push 傳回具體的數據。

const Stream = require('stream');
const readableStream = Stream.Readable();
readableStream._read = function() {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});
登入後複製

當 push 一個 null 時,就代表結束流。

執行效果如下:

深入了解Node.js中的四大流,解決「背壓」問題

建立Readable 也可以透過繼承的方式:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor() {
        super();
    }
    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }
}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});
登入後複製

可讀流是產生內容的,那麼很自然可以和生成器結合:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator;
    }
    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }
}
function *songGenerator() {
    yield '阿门阿前一棵葡萄树,';
    yield '阿东阿东绿的刚发芽,';
    yield '阿东背着那重重的的壳呀,';
    yield '一步一步地往上爬。';
}
const songIterator = songGenerator();
const readableStream = new ReadableDong(songIterator);
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});
登入後複製

這就是可讀流,透過實作_read 方法來傳回內容。

Writable

Writable 要實作 _write 方法,接收寫入的內容。

const Stream = require('stream');
const writableStream = Stream.Writable();
writableStream._write = function (data, enc, next) {
   console.log(data.toString());
   // 每秒写一次
   setTimeout(() => {
       next();
   }, 1000);
}
writableStream.on('finish', () => console.log('done~'));
writableStream.write('阿门阿前一棵葡萄树,');
writableStream.write('阿东阿东绿的刚发芽,');
writableStream.write('阿东背着那重重的的壳呀,');
writableStream.write('一步一步地往上爬。');
writableStream.end();
登入後複製

接收寫入的內容,列印出來,並且呼叫 next 來處理下一個寫入的內容,這裡呼叫 next 是非同步的,可以控制頻率。

跑了一下,確實可以正常的處理寫入的內容:

深入了解Node.js中的四大流,解決「背壓」問題

這就是可寫流,透過實作_write 方法來處理寫入的內容。

Duplex

Duplex 是可讀可寫,同時實作_read 和_write 就可以了

const Stream = require('stream');
var duplexStream = Stream.Duplex();
duplexStream._read = function () {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
duplexStream._write = function (data, enc, next) {
    console.log(data.toString());
    next();
}
duplexStream.on('data', data => console.log(data.toString()));
duplexStream.on('end', data => console.log('read done~'));
duplexStream.write('阿门阿前一棵葡萄树,');
duplexStream.write('阿东阿东绿的刚发芽,');
duplexStream.write('阿东背着那重重的的壳呀,');
duplexStream.write('一步一步地往上爬。');
duplexStream.end();
duplexStream.on('finish', data => console.log('write done~'));
登入後複製

整合了Readable 流和Writable 流的功能,這就是雙工流Duplex。

深入了解Node.js中的四大流,解決「背壓」問題

Transform

Duplex 串流雖然可讀可寫,但是兩者之間沒啥關聯,而有的時候需要對流入的內容做轉換之後流出,這時候就需要轉換流Transform。

Transform 流要實作_transform 的api,我們實作下對內容做反轉的轉換流:

const Stream = require('stream');
class TransformReverse extends Stream.Transform {
  constructor() {
    super()
  }
  _transform(buf, enc, next) {
    const res = buf.toString().split('').reverse().join('');
    this.push(res)
    next()
  }
}
var transformStream = new TransformReverse();
transformStream.on('data', data => console.log(data.toString()))
transformStream.on('end', data => console.log('read done~'));
transformStream.write('阿门阿前一棵葡萄树');
transformStream.write('阿东阿东绿的刚发芽');
transformStream.write('阿东背着那重重的的壳呀');
transformStream.write('一步一步地往上爬');
transformStream.end()
transformStream.on('finish', data => console.log('write done~'));
登入後複製

跑了一下,效果如下:

深入了解Node.js中的四大流,解決「背壓」問題##

流的暂停和流动

我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

深入了解Node.js中的四大流,解決「背壓」問題

背压

但是 read 和 write 都是异步的,如果两者速率不一致呢?

如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

深入了解Node.js中的四大流,解決「背壓」問題

解决背压

怎么解决这种读写速率不一致的问题呢?

当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

readableStream.readableFlowing = false;

let data;
while((data = readableStream.read()) != null) {
    console.log(data.toString());
}
登入後複製

但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

  • true: 数据已经写入目标

  • false:目标不可写入,暂时放在缓冲区

我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);
rs.on('data', function (chunk) {
    if (ws.write(chunk) === false) {
        rs.pause();
    }
});
rs.on('end', function () {
    ws.end();
});
ws.on('drain', function () {
    rs.resume();
});
登入後複製

这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

pipe 有背压问题么?

平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.pipe(ws);
登入後複製

总结

流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!

更多编程相关知识,请访问:编程视频!!

以上是深入了解Node.js中的四大流,解決「背壓」問題的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:微信公众号- 神光的编程秘籍
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!