Artikel ini akan memperkenalkan anda kepada modul strim Node dan memperkenalkan cara menggunakan Strim untuk membina aplikasi Node.js berprestasi tinggi saya harap ia akan membantu semua orang.
Apabila anda menaip aksara pada papan kekunci, membaca fail daripada cakera atau memuat turun fail daripada Internet, aliran maklumat (bit) mengalir melalui peranti yang berbeza dan aplikasi .
Jika anda belajar memproses strim bait ini, anda akan dapat membina aplikasi berprestasi tinggi dan bernilai. Sebagai contoh, bayangkan apabila anda menonton video di YouTube, anda tidak perlu menunggu sehingga video penuh dimuat turun. Sebaik sahaja terdapat penimbal kecil, video akan mula dimainkan dan selebihnya akan terus dimuat turun semasa anda menonton.
Nodejs mengandungi modul terbina dalam stream
yang membolehkan kami memproses data penstriman. Dalam artikel ini, kami akan menerangkan penggunaan stream
melalui beberapa contoh mudah Kami juga akan menerangkan cara membina saluran paip untuk menggabungkan aliran yang berbeza apabila membina aplikasi berprestasi tinggi dalam kes yang kompleks.
Sebelum kita menyelami pemahaman pembinaan aplikasi, adalah penting untuk memahami ciri yang disediakan oleh modul Node.js stream
.
Jom mulakan!
Node.js stream
menyediakan empat jenis strim
Untuk lebih lanjut butiran, sila semak dokumentasi rasmi Node.js
https://nodejs.org/api/stream.html#stream_types_of_streams
Mari kita lihat pada tahap yang tinggi Mari lihat pada setiap jenis aliran.
Strim boleh dibaca boleh membaca data daripada sumber data tertentu, selalunya daripada sistem fail. Kegunaan biasa strim boleh dibaca yang lain dalam aplikasi Node.js ialah:
process.stdin
- Membaca input pengguna dalam aplikasi terminal melalui stdin
. http.IncomingMessage
- Baca kandungan permintaan masuk dalam pelayan HTTP atau baca respons HTTP pelayan dalam klien HTTP. Anda boleh menggunakan strim boleh tulis untuk menulis data daripada aplikasi anda ke tempat tertentu, seperti fail.
process.stdout
boleh digunakan untuk menulis data kepada output standard dan digunakan secara dalaman oleh console.log
.
Seterusnya ialah strim dupleks dan strim transformasi, yang boleh ditakrifkan sebagai jenis strim hibrid berdasarkan strim boleh dibaca dan strim boleh tulis.
Strim dupleks ialah gabungan strim boleh dibaca dan strim boleh tulis Ia boleh menulis data ke tempat tertentu dan mendapatkan semula data daripadanya . Kes yang paling biasa bagi aliran dupleks ialah net.Socket
, yang digunakan untuk membaca dan menulis data daripada soket.
Adalah penting untuk ambil perhatian bahawa operasi hujung boleh dibaca dan hujung boleh tulis strim dupleks adalah bebas antara satu sama lain dan data tidak mengalir dari satu hujung ke hujung yang lain.
Transform stream adalah serupa sedikit dengan strim dupleks, tetapi dalam strim transform, bahagian yang boleh dibaca dan boleh ditulis dikaitkan. Kelas
crypto.Cipher
ialah contoh yang baik, yang melaksanakan strim yang disulitkan. Melalui strim crypto.Cipher
, aplikasi boleh menulis data teks biasa ke hujung strim yang boleh ditulis dan membaca teks sifir yang disulitkan dari hujung strim yang boleh dibaca. Jenis aliran ini dipanggil aliran penukaran kerana sifat penukarannya.
PS : Satu lagi aliran penukaran ialah stream.PassThrough
. stream.PassThrough
Hantar data dari bahagian yang boleh ditulis ke bahagian yang boleh dibaca tanpa sebarang penukaran. Ini mungkin kedengaran berlebihan, tetapi strim Passthrough sangat membantu untuk membina strim tersuai serta saluran paip strim. (seperti mencipta berbilang salinan data strim)
Setelah strim boleh dibaca disambungkan kepada data pengeluaran Sumber , seperti fail, boleh membaca data melalui strim dalam beberapa cara.
Mula-mula, buat fail teks ringkas bernama myfile
, bersaiz 85 bait, mengandungi rentetan berikut:
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.
Sekarang, mari lihat strim boleh dibaca Dua cara bacaan yang berbeza data.
data
Cara paling biasa untuk membaca data daripada strim boleh dibaca ialah mendengar acara data
yang dipancarkan oleh strim . Kod berikut menunjukkan pendekatan ini:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark
Atribut yang dihantar kepada fs.createReadStream
sebagai pilihan menentukan jumlah data yang ditimbal dalam strim ini. Data kemudiannya disalurkan ke mekanisme bacaan (dalam kes ini, pengendali data
kami). Secara lalai, nilai fs
untuk strim highWaterMark
yang boleh dibaca ialah 64kb. Kami sengaja menulis semula nilai ini kepada 20 bait untuk mencetuskan berbilang peristiwa data
.
如果你运行上述程序,它会在五个迭代内从 myfile
中读取 85 个字节。你会在 console 看到以下输出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
从可读流中读取数据的另一种方法是使用异步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你运行这个程序,你会得到和前面例子一样的输出。
当一个监听器监听到可读流的 data
事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的 readableFlowing
属性检查流的”流动”状态
我们可以稍微修改下前面的例子,通过 data
处理器来示范:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在从可读流中读取 60 个字节后停止阅读 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后继续读取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在这个例子中,我们从一个可读流中读取 myfile
,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了 readableFlowing
属性的值去理解他是如何变化的。
如果你运行上述程序,你会得到以下输出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我们可以用以下来解释输出:
当我们的程序开始时,readableFlowing
的值是 null
,因为我们没有提供任何消耗流的机制。
在连接到 data
处理器后,可读流变为“流动”模式,readableFlowing
变为 true
。
一旦读取 60 个字节,通过调用 pause()
来暂停流,readableFlowing
也转变为 false
。
在等待 1 秒后,通过调用 resume()
,流再次切换为“流动”模式,readableFlowing
改为 `true'。然后剩下的文件内容在流中流动。
因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。
在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。
但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:
mkfile -n 4g 4gb_file
xfs_mkfile 4096m 4gb_file
在我们创建了假文件 4gb_file
后,让我们在不使用 stream
模块的情况下来生成来文件的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你运行以上代码,你可能会得到以下错误:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }
以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的 buffer
模块里的 buffer.constants.MAX_LENGTH
变量来查看你操作系统缓存的最大尺寸。
即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。
pipeline()
减少 APP 的内存占用现在,让我们看看如何修改应用去使用流且避免遇到这个报错:
const fs = require("fs"); const crypto = require("crypto"); const { pipeline } = require("stream"); const hashStream = crypto.createHash("sha256"); hashStream.setEncoding('base64') const inputStream = fs.createReadStream("./4gb_file"); const outputStream = fs.createWriteStream("./checksum.txt"); pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
在这个例子中,我们使用 crypto.createHash
函数提供的流式方法。它返回一个“转换”流对象 hashStream
,为随机的大型文件生成 hash。
为了将文件内容传输到这个转换流中,我们使用 fs.createReadStream
为 4gb_file
创建了一个可读流 inputStream
。我们将 hashStream
转换流的输出传递到可写流 outputStream
中,而 checksum.txt
通过 fs.createWriteStream
创建的。
如果你运行以上程序,你将看见在 checksum.txt
文件中看见 4GB 文件的 SHA-256 hash。
pipeline()
和 pipe()
的对比在前面的案例中,我们使用 pipeline
函数来连接多个流。另一种常见的方法是使用 .pipe()
函数,如下所示:
inputStream .pipe(hashStream) .pipe(outputStream)
但这里有几个原因,所以并不推荐在生产应用中使用 .pipe()
。如果其中一个流被关闭或者出现报错,pipe()
不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe()
不会自动跨流转发错误到一个地方处理。
因为这些问题,所以就有了 pipeline()
,所以推荐你使用 pipeline()
而不是 pipe()
来连接不同的流。 我们可以重写上述的 pipe()
例子来使用 pipeline()
函数,如下:
pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
pipeline()
接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。
在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。
在这篇文章中,我们覆盖了:
data
事件或者使用异步迭代器来从可读流中读取数据。pipeline
连接多个流来减少内存占用。一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。
更多node相关知识,请访问:nodejs 教程!
Atas ialah kandungan terperinci Mari bercakap tentang modul strim Node.js dan lihat cara membina aplikasi berprestasi tinggi. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!