이 글에서는Node스트림 모듈을 소개하고 Stream을 사용하여 고성능 Node.js 애플리케이션을 구축하는 방법을 소개합니다. 모든 사람에게 도움이 되기를 바랍니다!
키보드에서 문자를 입력하거나, 디스크에서 파일을 읽거나, 인터넷에서 파일을 다운로드할 때 정보(비트)의 흐름이 다양한 장치와 애플리케이션을 통해 흐릅니다.
이러한 바이트 스트림을 처리하는 방법을 배우면 고성능의 가치 있는 애플리케이션을 구축할 수 있습니다. 예를 들어 YouTube에서 동영상을 볼 때 전체 동영상이 다운로드될 때까지 기다릴 필요가 없다고 상상해 보세요. 작은 버퍼가 있으면 비디오가 재생되기 시작하고 시청하는 동안 나머지는 계속 다운로드됩니다.
Nodejs에는 스트리밍 데이터를 처리할 수 있는 내장 모듈stream
이 포함되어 있습니다. 이 글에서는 몇 가지 간단한 예시를 통해stream
의 사용법을 설명하고, 복잡한 흐름에서 고성능 애플리케이션을 구축할 때 서로 다른 스트림을 병합하는 파이프라인을 구축하는 방법도 설명합니다.stream
可以让我们处理流数据。在这篇文章中,我们将通过几个简单的示例来讲解stream
的用法,我们也会描述在面对复杂案例构建高性能应用时,应该如何构建管道去合并不同的流。
在我们深入理解应用构建前,理解 Node.jsstream
模块提供的特性很重要。
让我们开始吧!
Node.jsstream
提供了四种类型的流
更多详情请查看 Node.js 官方文档
https://nodejs.org/api/stream.html#stream_types_of_streams
让我们在高层面来看看每一种流类型吧。
可读流可以从一个特定的数据源中读取数据,最常见的是从一个文件系统中读取。Node.js 应用中其他常见的可读流用法有:
process.stdin
-通过stdin
在终端应用中读取用户输入。http.IncomingMessage
- 在 HTTP 服务中读取传入的请求内容或者在 HTTP 客户端中读取服务器的 HTTP 响应。你可以使用可写流将来自应用的数据写入到特定的地方,比如一个文件。
process.stdout
可以用来将数据写成标准输出且被console.log
内部使用。
接下来是双工流和转换流,可以被定义为基于可读流和可写流的混合流类型。
双工流是可读流和可写流的结合,它既可以将数据写入到特定的地方也可以从数据源读取数据。最常见的双工流案例是net.Socket
,它被用来从 socket 读写数据。
有一点很重要,双工流中的可读端和可写端的操作是相互独立的,数据不会从一端流向另一端。
转换流与双工流略有相似,但在转换流中,可读端和可写端是相关联的。
crypto.Cipher
类是一个很好的例子,它实现了加密流。通过crypto.Cipher
流,应用可以往流的可写端写入纯文本数据并从流的可读端读取加密后的密文。之所以将这种类型的流称之为转换流就是因为其转换性质。
附注:另一个转换流是stream.PassThrough
。stream.PassThrough
从可写端传递数据到可读端,没有任何转换。这听起来可能有点多余,但 Passthrough 流对构建自定义流以及流管道非常有帮助。(比如创建一个流的数据的多个副本)
一旦可读流连接到生产数据的源头,比如一个文件,就可以用几种方法通过该流读取数据。
首先,先创建一个名为myfile
的简单的 text 文件,85 字节大小,包含以下字符串:
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.
现在,我们看下从可读流读取数据的两种不同方式。
data
事件从可读流读取数据的最常见方式是监听流发出的data
事件。以下代码演示了这种方式:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark
属性作为一个选项传递给fs.createReadStream
,用于决定该流中有多少数据缓冲。然后数据被冲到读取机制(在这个案例中,是我们的data
处理程序)。默认情况下,可读fs
流的highWaterMark
值是 64kb。我们刻意重写该值为 20 字节用于触发多个data
stream
모듈에서 제공하는 기능을 이해하는 것이 중요합니다. 시작해 보세요!
stream
은 네 가지 유형의 스트림을 제공합니다
자세한 내용은 Node.js 공식 문서를 확인하세요https://nodejs.org/api/stream.html#stream_types_of_streams각각 살펴보겠습니다. 높은 수준의 흐름 유형.
process.stdin
-stdin
을 통해 터미널 애플리케이션에서 사용자 입력을 읽습니다.http.IncomingMessage
- HTTP 서버에서 들어오는 요청의 내용을 읽거나 HTTP 클라이언트에서 서버의 HTTP 응답을 읽습니다.process.stdout
는 데이터를 표준 출력에 쓰는 데 사용할 수 있으며
console.log
에서 내부적으로 사용됩니다. 다음은 이중 스트림과 변환 스트림으로, 읽기 가능한 스트림과 쓰기 가능한 스트림을 기반으로 혼합 스트림 유형으로 정의할 수 있습니다.
net.Socket
입니다. 이중 스트림의 읽기 가능한 끝과 쓰기 가능한 끝의 작업은 서로 독립적이며 데이터가 한 끝에서 다른 끝으로 흐르지 않는다는 점에 유의하는 것이 중요합니다.
crypto.Cipher
클래스가 좋은 예이며 암호화된 스트림을 구현합니다.
crypto.Cipher
스트림을 통해 애플리케이션은 쓰기 가능한 스트림 끝 부분에 일반 텍스트 데이터를 쓰고 읽기 가능한 스트림 끝 부분에서 암호화된 암호 텍스트를 읽을 수 있습니다. 이러한 유형의 흐름은 변환 속성 때문에 변환 흐름이라고 합니다.
추신: 또 다른 변환 스트림은
stream.PassThrough
입니다.
stream.PassThrough
는 변환 없이 쓰기 가능한 쪽에서 읽기 가능한 쪽으로 데이터를 전달합니다. 중복된 것처럼 들릴 수도 있지만 패스스루 스트림은 사용자 지정 스트림과 스트림 파이프라인을 구축하는 데 매우 유용합니다. (예: 스트림 데이터의 여러 복사본 생성)
myfile
이라는 이름의 간단한 텍스트 파일을 만듭니다.
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
데이터
이벤트 수신data
이벤트를 수신합니다. 다음 코드는 이 접근 방식을 보여줍니다.
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
highWaterMark
속성은 스트림에서 버퍼링되는 데이터의 양을 결정하는 옵션으로
fs.createReadStream
에 전달됩니다. 그런 다음 데이터는 읽기 메커니즘(이 경우
data
핸들러)으로 플러시됩니다. 기본적으로 읽기 가능한
fs
스트림의
highWaterMark
값은 64kb입니다. 여러
data
이벤트를 트리거하기 위해 의도적으로 이 값을 20바이트로 다시 작성합니다.
如果你运行上述程序,它会在五个迭代内从myfile
中读取 85 个字节。你会在 console 看到以下输出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
从可读流中读取数据的另一种方法是使用异步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你运行这个程序,你会得到和前面例子一样的输出。
当一个监听器监听到可读流的data
事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的readableFlowing
属性检查流的”流动”状态
我们可以稍微修改下前面的例子,通过data
处理器来示范:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在从可读流中读取 60 个字节后停止阅读 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后继续读取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在这个例子中,我们从一个可读流中读取myfile
,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了readableFlowing
属性的值去理解他是如何变化的。
如果你运行上述程序,你会得到以下输出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我们可以用以下来解释输出:
当我们的程序开始时,readableFlowing
的值是null
,因为我们没有提供任何消耗流的机制。
在连接到data
处理器后,可读流变为“流动”模式,readableFlowing
变为true
。
一旦读取 60 个字节,通过调用pause()
来暂停流,readableFlowing
也转变为false
。
在等待 1 秒后,通过调用resume()
,流再次切换为“流动”模式,readableFlowing
改为 `true'。然后剩下的文件内容在流中流动。
因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。
在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。
但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:
mkfile -n 4g 4gb_file
xfs_mkfile 4096m 4gb_file
在我们创建了假文件4gb_file
后,让我们在不使用stream
模块的情况下来生成来文件的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你运行以上代码,你可能会得到以下错误:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }
以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的buffer
模块里的buffer.constants.MAX_LENGTH
变量来查看你操作系统缓存的最大尺寸。
即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。
pipeline()
减少 APP 的内存占用现在,让我们看看如何修改应用去使用流且避免遇到这个报错:
const fs = require("fs"); const crypto = require("crypto"); const { pipeline } = require("stream"); const hashStream = crypto.createHash("sha256"); hashStream.setEncoding('base64') const inputStream = fs.createReadStream("./4gb_file"); const outputStream = fs.createWriteStream("./checksum.txt"); pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
在这个例子中,我们使用crypto.createHash
函数提供的流式方法。它返回一个“转换”流对象hashStream
,为随机的大型文件生成 hash。
为了将文件内容传输到这个转换流中,我们使用fs.createReadStream
为4gb_file
创建了一个可读流inputStream
。我们将hashStream
转换流的输出传递到可写流outputStream
中,而checksum.txt
通过fs.createWriteStream
创建的。
如果你运行以上程序,你将看见在checksum.txt
文件中看见 4GB 文件的 SHA-256 hash。
pipeline()
和pipe()
的对比在前面的案例中,我们使用pipeline
函数来连接多个流。另一种常见的方法是使用.pipe()
函数,如下所示:
inputStream .pipe(hashStream) .pipe(outputStream)
但这里有几个原因,所以并不推荐在生产应用中使用.pipe()
。如果其中一个流被关闭或者出现报错,pipe()
不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe()
不会自动跨流转发错误到一个地方处理。
因为这些问题,所以就有了pipeline()
,所以推荐你使用pipeline()
而不是pipe()
来连接不同的流。 我们可以重写上述的pipe()
例子来使用pipeline()
函数,如下:
pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
pipeline()
接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。
在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。
在这篇文章中,我们覆盖了:
data
事件或者使用异步迭代器来从可读流中读取数据。pipeline
连接多个流来减少内存占用。一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。
更多node相关知识,请访问:nodejs 教程!
위 내용은 Node.js 스트림 모듈에 대해 이야기하고 고성능 애플리케이션을 구축하는 방법을 살펴보겠습니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!