Streams in Node.js are notoriously difficult to use or even understand. [Video tutorial recommendation:nodejs video tutorial]
In the words of Dominic Tarr: "Streaming is the best and most misunderstood idea in Node." Even Redux Creator and React.js core team member Dan Abramov is also afraid of Node streams.
This article will help you understand streams and how to use them. Don’t be afraid, you can totally figure it out!
What is a stream?
Streams are one of the fundamental concepts that power Node.js applications. They are data processing methods that sequentially read input data or write data to output.
Streaming is a way to handle reading and writing files, network communications, or any type of end-to-end information exchange in an efficient manner.
The processing method of stream is very unique. Instead of reading the fileall at oncelike the traditional way, the stream reads the data block piece by piece and processes the content of the data. Keep it all in memory.
This approach makes streams very powerful when processinglarge amounts of data. For example, the size of the file may be larger than the available memory space, making it impossible to read the entire file into memory for processing. That's where flow comes in!
You can use streams to process smaller data blocks and read larger files.
Take "streaming" services like YouTube or Netflix, for example: these services don't let you download video and audio files instantly. Instead, your browser receives the video as a continuous stream of chunks, allowing the recipient to start watching and listening almost immediately.
But streaming isn’t just about processing media and big data. They also give us the power of "composability" in our code. Designing with composability in mind means being able to combine multiple components in some way to produce the same type of result. In Node.js, you can pass data within other smaller snippets via streams to form powerful snippets.
Why use streams?
Streaming basically has two main advantages over other data processing methods:
There are 4 types of streams in Node.js:
fs.createWriteStream()
allows us to write data to a file using a stream.fs.createReadStream()
Let us read the contents of the file.net.Socket
If you have used Node.js, you may have encountered streams. For example, in a Node.js-based HTTP server,request
is a readable stream, andresponse
is a writable stream. You may have used thefs
module, which allows you to use readable and writable file streams. Whenever you use Express, you are using streams to interact with the client, and since TCP sockets, TLS stacks, and other connections are all based on Node.js, use streams in every database connection-driven program you can use. .
#How to create a readable stream?
The readability stream is first required and then initialized.
const Stream = require('stream') const readableStream = new Stream.Readable()
Now the stream is initialized and you can send data to it:
readableStream.push('ping!') readableStream.push('pong!')
Async Iterator
It is strongly recommended to use async iterator when using streams.According to Dr.Axel Rauschmayer, asynchronous iteration is a protocol for retrieving the contents of a data container asynchronously (meaning that the current "task" can be paused before retrieving items). It must also be mentioned that the stream async iterator implementation uses internalreadable
events.
When reading from a readable stream, you can use an asynchronous iterator:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!\n'
You can also use a string to collect the contents of the readable stream:
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。
请切记不要将异步功能与EventEmitter
混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。这个 pull request旨在解决一旦其落在 Node 核心上产生的问题。
要了解有关异步迭代的 Node.js 流的更多信息,请查看这篇很棒的文章。
Readable.from():从可迭代对象创建可读流
stream.Readable.from(iterable, [options])
这是一种实用方法,用于从迭代器中创建可读流,该迭代器保存可迭代对象中包含的数据。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,除其他作用外,还可以用于指定文本编码。
const { Readable } = require('stream'); async function * generate() { yield 'hello'; yield 'streams'; } const readable = Readable.from(generate()); readable.on('data', (chunk) => { console.log(chunk); });
两种读取模式
根据Streams API,可读流有效地以两种模式之一运行:flowing和paused。可读流可以处于对象模式,无论处于 flowing 模式还是 paused 模式。
EventEmitter
接口使用事件将其尽快提供给程序。stream.read()
方法以从流中读取数据块。在 flowing 模式中,要从流中读取数据,可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件,并执行你的回调。看下面的代码片段:
var fs = require("fs"); var data = ''; var readerStream = fs.createReadStream('file.txt'); //Create a readable stream readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. // Handle stream events --> data, end, and error readerStream.on('data', function(chunk) { data += chunk; }); readerStream.on('end',function() { console.log(data); }); readerStream.on('error', function(err) { console.log(err.stack); }); console.log("Program Ended");
函数调用fs.createReadStream()
给你一个可读流。最初流处于静态状态。一旦你侦听数据事件并附加了回调,它就会开始流动。之后将读取大块数据并将其传递给你的回调。流实现者决定发送数据事件的频率。例如,每当有几 KB 的数据被读取时,HTTP 请求就可能发出一个数据事件。当从文件中读取数据时,你可能会决定读取一行后就发出数据事件。
当没有更多数据要读取(结束)时,流将发出结束事件。在以上代码段中,我们监听此事件以在结束时得到通知。
另外,如果有错误,流将发出并通知错误。
在 paused 模式下,你只需在流实例上重复调用read()
,直到读完所有数据块为止,如以下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; var chunk; readableStream.on('readable', function() { while ((chunk=readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data) });
read()
函数从内部缓冲区读取一些数据并将其返回。当没有内容可读取时返回null
。所以在while
循环中,我们检查是否为null
并终止循环。请注意,当可以从流中读取大量数据时,将会发出可读事件。
所有Readable
流均以paused 模式开始,但可以通过以下方式之一切换为flowing 模式:
stream.resume()
方法。stream.pipe()
方法将数据发送到可写对象。Readable
可以使以下方法之一切换回 paused 模式:
stream.pause()
方法。stream.unpipe()
方法来删除多个管道目标。一个需要记住的重要概念是,除非提供了一种用于消耗或忽略该数据的机制,否则Readable
将不会生成数据。如果使用机制被禁用或取消,则Readable
将会试图停止生成数据。添加readable
事件处理会自动使流停止 flowing,并通过read.read()
得到数据。如果删除了readable
事件处理,那么如果存在 'data' 事件处理,则流将再次开始 flowing。
如何创建可写流?
要将数据写入可写流,你需要在流实例上调用write()
。如以下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk); });
上面的代码很简单。它只是简单地从输入流中读取数据块,并使用write()
写入目的地。该函数返回一个布尔值,指示操作是否成功。如果为true
,则写入成功,你可以继续写入更多数据。如果返回false
,则表示出了点问题,你目前无法写任何内容。可写流将通过发出drain
事件来通知你什么时候可以开始写入更多数据。
调用writable.end()
方法表示没有更多数据将被写入 Writable。如果提供,则可选的回调函数将作为finish
事件的侦听器附加。
// Write 'hello, ' and then end with 'world!'. const fs = require('fs'); const file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // Writing more now is not allowed!
你可以用可写流从可读流中读取数据:
const Stream = require('stream') const readableStream = new Stream.Readable() const writableStream = new Stream.Writable() writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() } readableStream.pipe(writableStream) readableStream.push('ping!') readableStream.push('pong!') writableStream.end()
还可以用异步迭代器来写入可写流,建议使用
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\n');
stream.finished()
的默认版本是基于回调的,但是可以通过util.promisify()
转换为基于 Promise 的版本(A行)。
在此例中,使用以下两种模式:
Writing to a writable stream while handling backpressure (line B):
在处理backpressure
时写入可写流(B行):
if (!writable.write(chunk)) { await once(writable, 'drain'); }
关闭可写流,并等待写入完成(C行):
writable.end(); await finished(writable);
pipeline(管道)是一种机制,可以将一个流的输出作为另一流的输入。它通常用于从一个流中获取数据并将该流的输出传递到另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流数据。
在 Node 10.x 中引入了stream.pipeline()
。这是一种模块方法,用于在流转发错误和正确清理之间进行管道传输,并在管道完成后提供回调。
这是使用管道的例子:
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // 使用 pipeline API 可以轻松将一系列流 // 通过管道传输在一起,并在管道完全完成后得到通知。 // 一个有效地用 gzip压缩巨大视频文件的管道: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } } );
由于pipe
不安全,应使用pipeline
代替pipe
。
流模块
Node.js 流模块提供了构建所有流 API 的基础。
Stream 模块是 Node.js 中默认提供的原生模块。 Stream 是 EventEmitter 类的实例,该类在 Node 中异步处理事件。因此流本质上是基于事件的。
要访问流模块:
const stream = require('stream');
stream
模块对于创建新型流实例非常有用。通常不需要使用stream
模块来消耗流。
流驱动的 Node API
由于它们的优点,许多 Node.js 核心模块提供了原生流处理功能,最值得注意的是:
net.Socket
是流所基于的主 API 节点,它是以下大多数 API 的基础process.stdin
返回连接到 stdin 的流process.stdout
返回连接到 stdout 的流process.stderr
返回连接到 stderr 的流fs.createReadStream()
创建一个可读的文件流fs.createWriteStream()
创建可写的文件流net.connect()
启动基于流的连接http.request()
返回http.ClientRequest
类的实例,它是可写流zlib.createGzip()
使用gzip(一种压缩算法)将数据压缩到流中zlib.createGunzip()
解压缩 gzip 流。zlib.createDeflate()
deflate(压缩算法)将数据压缩到流中zlib.createInflate()
解压缩一个deflate流流 备忘单:
查看更多:Node.js 流速查表
以下是与可写流相关的一些重要事件:
error
– Indicates that an error occurred while writing or configuring the pipeline.pipeline
– This event is emitted by a writable stream when a readable stream is passed into a writable stream.unpipe
– Emitted when you call unpipe on a readable stream and stop piping it to the destination stream.That’s all the basics about streams. Streams, pipes, and chains are the core and most powerful features of Node.js. Streams really help you write concise and efficient code to perform I/O.
Also, there is aNode.js strategic planworth looking forward to, calledBOB, which aims to improve the internal data flow of Node.js and hopefully as a future The public API of the Node.js streaming data interface.
English original address: https://nodesource.com/blog/understanding-streams-in-nodejs
Author: Liz Parody
Translation: Crazy Technology house
Related recommendations:nodejs tutorial
The above is the detailed content of In-depth understanding of streams in Node.js. For more information, please follow other related articles on the PHP Chinese website!