Home  >  Article  >  Web Front-end  >  Let’s talk about the core module in Nodejs: stream module (see how to use it)

Let’s talk about the core module in Nodejs: stream module (see how to use it)

青灯夜游
青灯夜游forward
2021-12-20 11:13:082277browse

This article will give you a detailed understanding of the stream module in Nodejs, and introduce the concept and usage of stream. I hope it will be helpful to everyone!

Let’s talk about the core module in Nodejs: stream module (see how to use it)

stream module is a very core module in Node. Other modules such as fs, http, etc. are all based on instances of the stream module.

For most front-end novices, when they first get started with Node, they still don’t have a clear understanding of the concept and use of streams, because there seems to be very little about "streams" in front-end work. Handle related applications.

1. Flow, what is it?

With the simple word "flow", we can easily have the concepts of water flow, flow, etc.

Official definition: Stream is an abstract interface used to process stream data in Node.js

From the official definition, we can see:

  • Stream is a tool for processing data provided by Node
  • Stream is an abstract interface in Node

Accurate understanding of stream, It can be understood as Data flow, which is a means of transmitting data. In an application, a stream is an ordered data flow with a starting point and an end point.

The main reason why we don’t understand stream well is that it is an abstract concept.

2. Specific usage scenarios of streams

In order for us to clearly understand the stream module, let us first explain the practical applications of the stream module with specific application scenarios.

stream stream, in Node, is mainly used in large amounts of data processing requirements, such as fs reading and writing of large files, http request response, file compression, data Encryption/decryption and other applications.

Let’s talk about the core module in Nodejs: stream module (see how to use it)

We use the above picture to illustrate the use of streams. The bucket can be understood as data source, and the pool can be understood as data target , the intermediate connection pipeline, we can understand it as data flow, through data flow pipeline, data flows from the data source to the data target.

3. Classification of streams

In Node, streams are divided into 4 categories: readable stream, writable stream, duplex stream, and conversion stream.

  • Writable: A stream to which data can be written
  • Readable: Yes Streams to read data from
  • Duplex: Streams for Readable and Writable
  • Transform: A stream that can modify or transform data while writing and reading it Duplex All streams are # An instance of
  • ##EventEmitter

. That is, we can monitor changes in the data flow through the event mechanism. 4. Data mode and cache areaBefore learning the specific use of 4 types of streams, we need to understand two concepts

Data mode

and

Cache area

, which will help us better understand in the next flow of study. 4.1 Data Mode

All streams created by the Node.js API are only for strings and

Buffer

(or Uint8Array) object to operate on. 4.2 Buffer

Writable and

Readable

streams both store data in internal buffers in the buffer. <p>The amount of data that can be buffered depends on the <code>highWaterMark option passed to the stream's constructor. For ordinary streams, the highWaterMark option specifies the total number of bytes ;For streams operating in object mode, the highWaterMark option specifies the total number of objects. The

highWaterMark option is a threshold, not a limit: it dictates how much data the stream buffers before it stops requesting more data.

When the implementation calls stream.push(chunk), the data is cached in the Readable stream. If the consumer of the stream does not call stream.read(), the data will remain in the internal queue until it is consumed.

Once the total size of the internal read buffer reaches the threshold specified by highWaterMark, the stream will temporarily stop reading data from the underlying resource until the currently buffered data can be consumed

When the writable.write(chunk) method is called repeatedly, the data will be cached in the Writable stream.

5. Readable stream

##5.1 Flow and pause of stream reading

Readable Streams effectively run in one of two modes: flowing and paused.

  • Flow mode: Read data from the bottom layer of the system and push() to the cache area. After reaching highWaterMark, push() will return false, and the resources will stop flowing to the cache area, and data event consumption will be triggered. data.

  • Pause mode: All Readable streams start in Paused pause mode, and the stream.read() method must be explicitly called to read data from the stream. Every time data reaches the buffer area, a readable event will be triggered, that is, every push() will trigger readable.

  • How to switch from pause mode to flow mode:

      Add data event handle
    • Call the stream.resume() method
    • Call the stream.pipe() method to send data to Writable
  • How to switch flow mode to pause mode:

      If not Pipeline target, by calling the stream.pause() method.
    • If there are pipeline targets, delete all pipeline targets. Multiple pipeline targets can be removed by calling the stream.unpipe() method.

5.2 Common examples of readable streams

import path from &#39;path&#39;;
import fs, { read } from &#39;fs&#39;;

const filePath = path.join(path.resolve(), &#39;files&#39;, &#39;text.txt&#39;);

const readable = fs.createReadStream(filePath);
// 如果使用 readable.setEncoding() 方法为流指定了默认编码,则监听器回调将把数据块作为字符串传入;否则数据将作为 Buffer 传入。
readable.setEncoding(&#39;utf8&#39;);
let str = &#39;&#39;;

readable.on(&#39;open&#39;, (fd) => {
  console.log(&#39;开始读取文件&#39;)
})
// 每当流将数据块的所有权移交给消费者时,则会触发 &#39;data&#39; 事件
readable.on(&#39;data&#39;, (data) => {
  str += data;
  console.log(&#39;读取到数据&#39;)
})
// 方法将导致处于流动模式的流停止触发 &#39;data&#39; 事件,切换到暂停模式。 任何可用的数据都将保留在内部缓冲区中。
readable.pause();
// 方法使被显式暂停的 Readable 流恢复触发 &#39;data&#39; 事件,将流切换到流动模式。
readable.resume();
// 当调用 stream.pause() 并且 readableFlowing 不是 false 时,则会触发 &#39;pause&#39; 事件。
readable.on(&#39;pause&#39;, () => {
  console.log(&#39;读取暂停&#39;)
})
// 当调用 stream.resume() 并且 readableFlowing 不是 true 时,则会触发 &#39;resume&#39; 事件。
readable.on(&#39;resume&#39;, () => {
  console.log(&#39;重新流动&#39;)
})
// 当流中没有更多数据可供消费时,则会触发 &#39;end&#39; 事件。
readable.on(&#39;end&#39;, () => {
  console.log(&#39;文件读取完毕&#39;);
})
// 当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 &#39;close&#39; 事件。
readable.on(&#39;close&#39;, () => {
  console.log(&#39;关闭文件读取&#39;)
})
// 将 destWritable 流绑定到 readable,使其自动切换到流动模式并将其所有数据推送到绑定的 Writable。 数据流将被自动管理
readable.pipe(destWriteable)
// 如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能会发生这种情况。
readable.on(&#39;error&#39;, (err) => {
  console.log(err)
  console.log(&#39;文件读取发生错误&#39;)
})

6. Writable streams

6.1 The flow and suspension of writable streams

The writeable stream is similar to the readable stream. When the data flows over, it will be written directly to the buffer area. , when the writing speed is slow or the writing is suspended, the data stream will be cached in the cache area; Pressure", at this time you need to tell the producer to suspend production. When the queue is released, the writable stream will send a drain message to the producer to resume production.

6.2 Writable stream example

import path from &#39;path&#39;;
import fs, { read } from &#39;fs&#39;;

const filePath = path.join(path.resolve(), &#39;files&#39;, &#39;text.txt&#39;);
const copyFile = path.join(path.resolve(), &#39;files&#39;, &#39;copy.txt&#39;);

let str = &#39;&#39;;
// 创建可读流
const readable = fs.createReadStream(filePath);
// 如果使用 readable.setEncoding() 方法为流指定了默认编码
readable.setEncoding(&#39;utf8&#39;);

// 创建可写流
const wirteable = fs.createWriteStream(copyFile);
// 编码
wirteable.setDefaultEncoding(&#39;utf8&#39;);

readable.on(&#39;open&#39;, (fd) => {
  console.log(&#39;开始读取文件&#39;)
})
// 每当流将数据块的所有权移交给消费者时,则会触发 &#39;data&#39; 事件
readable.on(&#39;data&#39;, (data) => {
  str += data;
  console.log(&#39;读取到数据&#39;);

  // 写入
  wirteable.write(data, &#39;utf8&#39;);
})

wirteable.on(&#39;open&#39;, () => {
  console.log(&#39;开始写入数据&#39;)
})
// 如果对 stream.write(chunk) 的调用返回 false,则 &#39;drain&#39; 事件将在适合继续将数据写入流时触发。
// 即生产数据的速度大于写入速度,缓存区装满之后,会暂停生产着从底层读取数据
// writeable缓存区释放之后,会发送一个drain事件让生产者继续读取
wirteable.on(&#39;drain&#39;, () => {
  console.log(&#39;继续写入&#39;)
})
// 在调用 stream.end() 方法之后,并且所有数据都已刷新到底层系统,则触发 &#39;finish&#39; 事件。
wirteable.on(&#39;finish&#39;, () => {
  console.log(&#39;数据写入完毕&#39;)
})

readable.on(&#39;end&#39;, () => {
  // 数据读取完毕通知可写流
  wirteable.end()
})
// 当在可读流上调用 stream.pipe() 方法将此可写流添加到其目标集时,则触发 &#39;pipe&#39; 事件。
// readable.pipe(destWriteable)
wirteable.on(&#39;pipe&#39;, () => {
  console.log(&#39;管道流创建&#39;)
})

wirteable.on(&#39;error&#39;, () => {
  console.log(&#39;数据写入发生错误&#39;)
})
For more node-related knowledge, please visit: nodejs tutorial

! !

The above is the detailed content of Let’s talk about the core module in Nodejs: stream module (see how to use it). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:juejin.cn. If there is any infringement, please contact admin@php.cn delete