Let's talk about how to implement Stream (readable, writable, duplex and conversion streams) in Node.js

Release: 2021-12-22 19:23:53
This article will take you to understand the Stream inNode, and introduce the method of introducing Stream to realize readable stream, writable stream, duplex stream and conversion stream. I hope it will be useful to everyone. help!

Introducing Stream

Suppose we have such a requirement, we need to copy the contents of one file to another file, We will write the following code

const fs = require('fs'); const path = require('path'); const copy = (source, target) => { fs.readFile(path.resolve(source), (err, data) => { if(err) { throw new Error(err.toString()); return; } fs.writeFile(path.resolve(target), data, (err) => { if(!err) { console.log("复制成功!"); } }) }) }
Copy after login

The above code is very simple, that is, first read the contents of thesourcefile, and then write the contents to thetargetfile . Its characteristic is that it needs to readall the contentsinsource, and then write the contents intotarget.

This has a disadvantage. When we read a large file, there may be insufficient memory, because it will first read all the contents of the file into the memory; in addition, time, Reading a large file into memory at one time takes a long time, and users may feel stuck.

Another solution is to read and write at the same time, read part of the file content, and then write the content to a new file. In this way, the data in the memory is only part of the content and does not occupy too much memory. , because it is written while reading, users can get responses quickly, improving user experience.

On theInternetI found an animated picture to vividly show the flow of data before and after using the flow

Lets talk about how to implement Stream (readable, writable, duplex and conversion streams) in Node.js

Lets talk about how to implement Stream (readable, writable, duplex and conversion streams) in Node.js

Node.jsprovides us with the Stream API, which is specially used to process large files. Because data is processed part by part, just like water flow, the name of this module is called Stream.

const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.on('data', data => { ws.write(data); }); rs.on('end', () => { ws.end(); }); }
Copy after login

The details of the above code will be revealed later.

Classification of Stream

Stream can be divided into four categories

  • Readable: readable stream, data provider
  • Writeable: Writable stream, consumer of data
  • Duplex: Writable and readable stream (duplex stream)
  • Transform: It is a special case of Duplex, transforming stream, for input The data is processed and then output

Readable streams and writable streams are the basis. Common readable streams and writable streams are as follows

Readable stream Writable stream
HTTP Request HTTP Response
fs read streams fs write streams
process.stdin process.stdout
TCP sockets TCP sockets
zlib streams zlib streams
crypto streams crypto streams

Stream 是 EventEmitter 的实例,有自定义的事件。

Readable Stream

可读流有两个模式,暂停模式与流动模式。当我们创建一个流时,如果我们监听了readable事件,它就会来到暂停模式,在暂停模式下,它会不断的读取数据到缓冲区,当读取到的数据超过预设的大小时,它由属性highWaterMark指定(默认为 64kB),便会触发readable事件,readable事件的触发有两种情况:

  • 缓存区中的数据达到highWaterMark预设的大小
  • 数据源的数据已经被读取完毕
const fs = require('fs'); const rs = fs.createReadStream('a.txt', { highWaterMark: 1 // 缓存区最多存储 1 字节 }); rs.on('readable', () => { let data; while(data=rs.read()) { console.log(data.toString()); } })
Copy after login

上面的程序设置highWaterMark为 1,即每次读取到一个字节便会触发readable命令,每次当触发readable命令时,我们调用可读流的read([size])方法从缓冲区中读取数据(读取到的数据为 Buffer),然后打印到控制台。


const fs = require('fs'); const rs = fs.createReadStream('a.txt', { highWaterMark: 2 }); rs.on('data', data => { console.log(data.toString()); }); rs.on('end', () => { console.log("文件读取完毕!"); });
Copy after login


可读流的一个经典实例就是http中的请求对象req,下面的程序展示了通过监听reqdata事件来读取 HTTP 请求体中的内容

const http = require('http'); const app = http.createServer(); app.on('request', (req, res) => { let datas = []; req.on('data', data => { datas.push(data); }); req.on('end', () => { req.body = Buffer.concat(datas); // 当读取完 body 中的内容之后,将内容返回给客户端 res.end(req.body); }); }) app.listen(3000, () => { console.log("服务启动在 3000 端口... ..."); })
Copy after login

Lets talk about how to implement Stream (readable, writable, duplex and conversion streams) in Node.js

Writable Stream



const fs = require('fs'); const ws = fs.createWriteStream('b.txt', { highWaterMark: 16 * 1024 }); function writeMillionTimes(writer, data, encoding, callback) { let i = 10000; write(); function write() { // 表示是否可以向可写流中写入数据 let ok = true; while(i-- > 0 && ok) { // 当 writer.write() 方法返回 false 表示不可写入数据 ok = writer.write(data, encoding, i === 0 ? callback : null); } if(i > 0) { // 说明 ok 为 false,即不能向缓冲区中写入内容了 console.log("drain", i); // 监听 drain 事件,当队列消费完毕时继续调用 write() 方法写入 writer.once('drain', write); } } } writeMillionTimes(ws, 'simple', 'utf-8', () => { console.log("end"); })
Copy after login


drain 7268 drain 4536 drain 1804 end
Copy after login

说明有三次缓冲区中的内容达到了 16KB,可以验算上面的数字之间的差值,在乘以6(simple 的字节数),大小大约为16 * 1024左右,如

( 7268 4536 ) 6 = 16392 16384 = 16 1024 (7268 - 4536) * 6 = 16392 \approx 16384 = 16 * 1024


const fs = require('fs'); const ws = fs.createWriteStream('b.txt'); ws.write('Hello'); ws.write('World'); ws.end('!'); ws.on('close', () => { console.log("close"); // close })
Copy after login


const fs = require('fs'); const ws = fs.createWriteStream('b.txt'); ws.write('Hello'); ws.write('World'); ws.end('!'); ws.write('write again'); // Error [ERR_STREAM_WRITE_AFTER_END]: write after end
Copy after login


const fs = require('fs'); const ws = fs.createWriteStream('b.txt'); ws.write('Hello'); ws.write('World'); ws.end('!'); ws.on('close', () => { console.log("close"); }); ws.on('finish', () => { console.log("finish"); });
Copy after login


finish close
Copy after login



const http = require('http'); const fs = require('fs'); const app = http.createServer(); app.on('request', (req, res) => { const rs = fs.createReadStream('index.html'); rs.on('data', data => { res.write(data); }) rs.on('end', () => { res.end() }); }); app.listen(3000, () => { console.log("服务启动在 3000 端口 ... ..."); })
Copy after login

Duplex Stream 与 Transform Stream

Duplex,即双工的意思,它既可以接收数据,也可以输出数据,它的输入和输出之间可以没有任何的关系,就像是一个部件内部有两个独立的系统。Duplex 继承了可读流(Readable),并且拥有可写流(Writable)的所有方法。

Transform Stream 继承了 Duplex Stream,它同样具有可读流与可写流的能力,并且它的输出与输入之间是有关系的,中间做了一次转换。常见的转换流有zlibcrypto




const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.on('data', data => { ws.write(data); }); rs.on('end', () => { ws.end(); }); } copy('a.txt', 'b.txt');
Copy after login



const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.on('data', data => { if (!ws.write(data)) { rs.pause(); } }); rs.on('end', () => { ws.end(); }); ws.on('drain', () => { rs.resume(); }) }
Copy after login


const fs = require('fs'); function copy(source, target) { const rs = fs.createReadStream(source); const ws = fs.createWriteStream(target); rs.pipe(ws); }
Copy after login


Readable.prototype.pipe = function(ws) { this.on('data', data => { if (!ws.write(data)) { this.pause(); } }); ws.on('drain', () => { this.resume(); }); // 触发 pipe 事件 ws.emit('pipe', this); // 返回可写流,以支持链式调用 return ws; }
Copy after login


+===================+ x--> Piping functions +--> src.pipe(dest) | x are set up during |===================| x the .pipe method. | Event callbacks | +===============+ x |-------------------| | Your Data | x They exist outside | .on('close', cb) | +=======+=======+ x the data flow, but | .on('data', cb) | | x importantly attach | .on('drain', cb) | | x events, and their | .on('unpipe', cb) | +---------v---------+ x respective callbacks. | .on('error', cb) | | Readable Stream +----+ | .on('finish', cb) | +-^-------^-------^-+ | | .on('end', cb) | ^ | ^ | +-------------------+ | | | | | ^ | | ^ ^ ^ | +-------------------+ +=================+ ^ | ^ +----> Writable Stream +---------> .write(chunk) | | | | +-------------------+ +=======+=========+ | | | | | ^ | +------------------v---------+ ^ | +-> if (!chunk) | Is this chunk too big? | ^ | | emit .end(); | Is the queue busy? | | | +-> else +-------+----------------+---+ | ^ | emit .write(); | | | ^ ^ +--v---+ +---v---+ | | ^-----------------------------------< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^------------^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+
Copy after login


在本节中我们来实现具体的流,通过实现流可以进一步加深对 Stream 内部工作细节的理解。



const { Readable } = require('stream'); class IeteratorReadableStream extends Readable { constructor(iterator) { super(); this.iterator = iterator; } _read() { let data = this.iterator.next(); // console.log(data); if(data.done) { this.push(null); } else { // 必须 push 字符串或者 Buffer this.push(data.value+''); } } } module.exports = IeteratorReadableStream;
Copy after login

上述我们实现了一个可读流,可读流接收一个迭代器作为参数,这个迭代器作为这个可读流的数据源。可读流会自动的调用_read获取数据,在_read方法中我们从迭代器中获取数据,并且调用了push方法,该方法的作用就是将数据放入到缓存区中,只能向其中push字符串或者 Buffer,当我们向其中pushnull 时就表示数据已经被全部读取完毕。

所以可读流的执行逻辑为,每次调用_read方法从数据源读取数据,并将数据存入缓存区,然后触发data事件,将缓存区中的数据作为参数传递给data事件绑定的回调函数,循环上述过程直到向缓存区pushnull 时,就表示数据源中的数据已经被读取完毕,此时会触发end事件。


const IeteratorReadableStream = require('./IteratorReadableStream'); function *getData() { for(let i = 0; i < 5; i++) { yield i; } } let rs = new IeteratorReadableStream(getData()); rs.on('data', data => { console.log(data.toString()); }); rs.on('end', () => { console.log("迭代结束"); });
Copy after login


0 1 2 3 4 迭代结束
Copy after login



const fs = require('fs'); const { Writable } = require('stream'); class FileWritableStream extends Writable { constructor(filepath) { super(); this.filepath = filepath; } _write(chunk, encoding, callback) { fs.appendFile(this.filepath, chunk, { encoding }, callback) } }
Copy after login


process.stdin.pipe(new FileWritableStream('c.txt'));
Copy after login



Duplex Stream 既可以作为可读流,也可以作为可写流,并且它的输入与输出之间可以没有关系。Duplex Stream 继承了 Readable,并且拥有 Writable 的所有,我们只要分别实现_read()_write()方法即可

const { Duplex } = require('stream'); class CustomDuplexStream extends Duplex { constructor() { super(); this.currentCharCode = 65; } _read() { if(this.currentCharCode <= 90) { this.push(String.fromCharCode(this.currentCharCode++)) } else { this.push(null); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }
Copy after login

上面双工流的可读流部分就是将大写的 26 个字母添加进了缓存区,而可写流部分就是直接将数据输出到控制台。可见双工流可读流与可写流之间并没有任何的关系

const dp = new CustomDuplexStream(); dp.write("1"); dp.write("2"); dp.end(); dp.pipe(process.stdout);
Copy after login


Copy after login


Tranform Stream 是 Duplex 的特例,它也是一个双工流,不过它的输入和输出之间有关联,它的内部通过_transform()方法将可写流接收到的数据经过转换后传入到可读流中,所以我们要实现转换流,只需要实现_transform()方法即可

const { Transform } = require('stream'); class UpperTransformStream extends Transform { _transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }
Copy after login


const ts = new UpperTransformStream(); const rs = fs.createReadStream('a.txt'); rs.pipe(ts).pipe(process.stdout);
Copy after login


转换流在实际应用中还是比较多的,这里介绍一个 Node.js 内置的转换流zlib,它的作用对文件进行解压缩,将文件压缩为压缩文件,或者将压缩文件解压为正常文件,这不就是一个典型的转换流嘛!

const zlib = require('zlib'); const fs = require('fs'); const args = process.argv.slice(2); const source = fs.createReadStream(args[0]); const target = fs.createWriteStream(args[1]); const gzip = zlib.createGzip(); source.pipe(gzip).pipe(target);
Copy after login


node gzip.js Graph.md Graph.md.gz
Copy after login

来运行上面的程序,它可以将 Graph.md 使用 gzip 压缩为 Graph.md.gz。

Lets talk about how to implement Stream (readable, writable, duplex and conversion streams) in Node.js



本篇文章到此结束,想必到这里已经对 Stream 的使用已经有所了解了,但是本篇文章并没有列举所有有关 Stream 的 API,如果想更加详细的了解 Stream 的 API,那么接下去就可以阅读官方文档(http://nodejs.cn/api/stream.html)了。

更多node相关知识,请访问:nodejs 教程!!

The above is the detailed content of Let's talk about how to implement Stream (readable, writable, duplex and conversion streams) in Node.js. For more information, please follow other related articles on the PHP Chinese website!

