この記事では、Node ストリーム モジュールについて紹介し、ストリームを使用して高パフォーマンスの Node.js アプリケーションを構築する方法を紹介します。
キーボードで文字を入力したり、ディスクからファイルを読み取ったり、インターネットからファイルをダウンロードしたりすると、情報のストリーム (ビット) がさまざまなデバイスを流れ、アプリケーション。
これらのバイト ストリームの処理方法を学べば、高性能で価値のあるアプリケーションを構築できるようになります。たとえば、YouTube でビデオを視聴するときに、ビデオ全体がダウンロードされるまで待つ必要がないことを想像してください。バッファーが小さくなると、ビデオの再生が開始され、視聴中に残りの部分がダウンロードされ続けます。
Nodejs には、ストリーミング データを処理できる組み込みモジュール stream
が含まれています。この記事では、いくつかの簡単な例を通じて stream
の使用法を説明し、複雑なケースで高パフォーマンスのアプリケーションを構築するときに、異なるストリームをマージするパイプラインを構築する方法についても説明します。
アプリケーションの構築について理解する前に、Node.js stream
モジュールによって提供される機能を理解することが重要です。 ######はじめましょう!
4 種類のストリームを提供します
読み取り可能なストリームhttps://nodejs.org/api/stream.html#stream_types_of_streams
各ストリーム タイプの概要を見てみましょう。
stdin
を介して端末アプリケーションでユーザー入力を読み取ります。
は、標準出力にデータを書き込むために使用でき、console.log
によって内部的に使用されます。 次は二重ストリームと変換ストリームです。これらは、読み取り可能なストリームと書き込み可能なストリームに基づいて混合ストリーム タイプとして定義できます。
で、これはソケットからのデータの読み取りと書き込みに使用されます。 二重ストリームの読み取り可能な側と書き込み可能な側の操作は互いに独立しており、データは一方の端からもう一方の端へ流れるわけではないことに注意することが重要です。
クラスは、暗号化されたストリームを実装する良い例です。 crypto.Cipher
ストリームを通じて、アプリケーションはストリームの書き込み可能な端にプレーン テキスト データを書き込み、ストリームの読み取り可能な端から暗号化された暗号文を読み取ることができます。このタイプのフローは、その変換特性のため変換フローと呼ばれます。
: 別の変換ストリームは stream.PassThrough です。 stream.PassThrough
データを変換せずに書き込み可能側から読み取り可能側に渡します。冗長に聞こえるかもしれませんが、パススルー ストリームは、カスタム ストリームやストリーム パイプラインの構築に非常に役立ちます。 (ストリームのデータの複数のコピーの作成など)
まず、
myfile という名前の単純なテキスト ファイルを作成します。サイズは 85 バイトで、次の文字列が含まれます: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.</pre><div class="contentsignin">ログイン後にコピー</div></div>
次に、利用可能な読み取りストリームを見てみましょう。データを読み取る 2 つの異なる方法があります。
読み取り可能なストリームからデータを読み取る最も一般的な方法は、const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark このプロパティは、ストリームにバッファリングされるデータ量を決定するオプションとして fs.createReadStream に渡されます。次に、データは読み取りメカニズム (この場合は
data ハンドラー) にフラッシュされます。デフォルトでは、読み取り可能な
fs ストリームの
highWaterMark 値は 64kb です。複数の
data イベントをトリガーするには、この値を意図的に 20 バイトに書き換えます。
如果你运行上述程序,它会在五个迭代内从 myfile
中读取 85 个字节。你会在 console 看到以下输出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
从可读流中读取数据的另一种方法是使用异步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你运行这个程序,你会得到和前面例子一样的输出。
当一个监听器监听到可读流的 data
事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的 readableFlowing
属性检查流的”流动”状态
我们可以稍微修改下前面的例子,通过 data
处理器来示范:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在从可读流中读取 60 个字节后停止阅读 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后继续读取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在这个例子中,我们从一个可读流中读取 myfile
,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了 readableFlowing
属性的值去理解他是如何变化的。
如果你运行上述程序,你会得到以下输出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我们可以用以下来解释输出:
当我们的程序开始时,readableFlowing
的值是 null
,因为我们没有提供任何消耗流的机制。
在连接到 data
处理器后,可读流变为“流动”模式,readableFlowing
变为 true
。
一旦读取 60 个字节,通过调用 pause()
来暂停流,readableFlowing
也转变为 false
。
在等待 1 秒后,通过调用 resume()
,流再次切换为“流动”模式,readableFlowing
改为 `true'。然后剩下的文件内容在流中流动。
因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。
在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。
但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:
mkfile -n 4g 4gb_file
xfs_mkfile 4096m 4gb_file
在我们创建了假文件 4gb_file
后,让我们在不使用 stream
模块的情况下来生成来文件的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你运行以上代码,你可能会得到以下错误:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }
以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的 buffer
模块里的 buffer.constants.MAX_LENGTH
变量来查看你操作系统缓存的最大尺寸。
即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。
pipeline()
减少 APP 的内存占用现在,让我们看看如何修改应用去使用流且避免遇到这个报错:
const fs = require("fs"); const crypto = require("crypto"); const { pipeline } = require("stream"); const hashStream = crypto.createHash("sha256"); hashStream.setEncoding('base64') const inputStream = fs.createReadStream("./4gb_file"); const outputStream = fs.createWriteStream("./checksum.txt"); pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
在这个例子中,我们使用 crypto.createHash
函数提供的流式方法。它返回一个“转换”流对象 hashStream
,为随机的大型文件生成 hash。
为了将文件内容传输到这个转换流中,我们使用 fs.createReadStream
为 4gb_file
创建了一个可读流 inputStream
。我们将 hashStream
转换流的输出传递到可写流 outputStream
中,而 checksum.txt
通过 fs.createWriteStream
创建的。
如果你运行以上程序,你将看见在 checksum.txt
文件中看见 4GB 文件的 SHA-256 hash。
pipeline()
和 pipe()
的对比在前面的案例中,我们使用 pipeline
函数来连接多个流。另一种常见的方法是使用 .pipe()
函数,如下所示:
inputStream .pipe(hashStream) .pipe(outputStream)
但这里有几个原因,所以并不推荐在生产应用中使用 .pipe()
。如果其中一个流被关闭或者出现报错,pipe()
不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe()
不会自动跨流转发错误到一个地方处理。
因为这些问题,所以就有了 pipeline()
,所以推荐你使用 pipeline()
而不是 pipe()
来连接不同的流。 我们可以重写上述的 pipe()
例子来使用 pipeline()
函数,如下:
pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
pipeline()
接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。
在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。
在这篇文章中,我们覆盖了:
data
事件或者使用异步迭代器来从可读流中读取数据。pipeline
连接多个流来减少内存占用。一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。
更多node相关知识,请访问:nodejs 教程!
以上がNode.js ストリーム モジュールについて説明し、高パフォーマンスのアプリケーションを構築する方法を見てみましょうの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。