Was ist ein Stream? So verstehen Sie Streams in Nodejs

青灯夜游
Freigeben: 2022-05-16 20:56:23
nach vorne
2009 Leute haben es durchsucht

Was ist Flow? Wie versteht man den Fluss? Der folgende Artikel vermittelt Ihnen ein detailliertes Verständnis des Streams inNode. Ich hoffe, er wird Ihnen hilfreich sein!

Was ist ein Stream? So verstehen Sie Streams in Nodejs

Der Autor hat in letzter Zeit oft die Pipe-Funktion in der Entwicklung verwendet. Ich weiß nur, dass es sich um eine Pipeline für Streams handelt, aber ich weiß nicht, wie es funktioniert. Mit der Mentalität, es herauszufinden, fange ich einfach an Den Stream lesen und beiläufig lernen. Das Wissen und der Quellcode, die ich gelesen habe, sind in einem Artikel zusammengefasst, den ich mit allen teilen kann.

Stream ist ein sehr grundlegendes Konzept inNodejs. Viele Grundmodule werden auf Basis von Streams implementiert und spielen eine sehr wichtige Rolle. Gleichzeitig ist Flow auch ein sehr schwer zu verstehendes Konzept. Dies liegt vor allem an der fehlenden relevanten Dokumentation. Für NodeJs-Anfänger dauert es glücklicherweise oft lange, den Flow zu verstehen. Für die meisten NodeJs wird es von Benutzern nur zum Entwickeln von Webanwendungen verwendet. Ein unzureichendes Verständnis von Streams hat keinen Einfluss auf deren Verwendung. Das Verständnis von Streams kann jedoch zu einem besseren Verständnis anderer Module in NodeJs führen, und in einigen Fällen liefert die Verwendung von Streams zur Datenverarbeitung bessere Ergebnisse. [Verwandte Tutorial-Empfehlungen:nodejs-Video-Tutorial]

Wie man Streams versteht

  • Für Stream-Benutzer kann der Stream als Array betrachtet werden, und wir müssen uns nur auf das Abrufen (Konsumieren) und Schreiben daraus konzentrieren es (Produktion) ist in Ordnung.

  • Für Stream-Entwickler (die das Stream-Modul zum Erstellen einer neuen Instanz verwenden) konzentrieren sie sich darauf, wie einige Methoden im Stream implementiert werden. Sie konzentrieren sich normalerweise auf zwei Punkte: Wer ist die Zielressource und wie wird die Zielressource betrieben? . Sobald die Zielressourcen festgelegt sind, müssen sie entsprechend den verschiedenen Zuständen und Ereignissen des Streams betrieben werden. Cache-Pool: Alle Streams in NodeJs verfügen über Pufferpools. Der Zweck des Pufferpools besteht darin, die Effizienz zu erhöhen Der Stream, wenn Daten produziert und verbraucht werden. Wenn es Zeit braucht, können wir Daten im Voraus produzieren und sie vor dem nächsten Verbrauch im Pufferpool speichern. Der Pufferpool wird jedoch nicht immer verwendet. Wenn der Cache-Pool beispielsweise leer ist, werden die Daten nach der Produktion nicht in den Cache-Pool gelegt, sondern direkt verbraucht. .

  • Wenn die Geschwindigkeit der Datenproduktion größer ist als die Geschwindigkeit des Datenverbrauchs, werden die überschüssigen Daten irgendwo warten. Wenn die Datenproduktionsgeschwindigkeit langsamer ist als die Prozessdatenverbrauchsgeschwindigkeit, sammeln sich die Daten irgendwo bis zu einer bestimmten Menge an und werden dann verbraucht. (Entwickler können die Geschwindigkeit der Datenproduktion und des Datenverbrauchs nicht kontrollieren, sie können nur versuchen, zu welchem Zeitpunkt Daten zu produzieren oder zu verbrauchen.)

Der Ort, an dem Daten warten, Daten ansammeln und dann auftreten. Es ist derPufferpool. Der Pufferpool befindet sich normalerweise im RAM (Speicher) des Computers.

Um ein Beispiel für einen gemeinsamen Puffer zu nennen: Wenn wir Online-Videos ansehen und Ihre Internetgeschwindigkeit hoch ist, wird der Puffer immer sofort gefüllt, dann zur Wiedergabe an das System gesendet und dann wird das nächste Video sofort gepuffert. Beim Betrachten kommt es zu keiner Verzögerung. Wenn die Netzwerkgeschwindigkeit sehr langsam ist, wird ein Ladevorgang angezeigt, der darauf hinweist, dass der Puffer gefüllt ist. Wenn der Füllvorgang abgeschlossen ist, werden die Daten an das System gesendet und Sie können dieses Video sehen.

Der Cache-Pool des NodeJs-Streams ist eine mit Puffer verknüpfte Liste. Jedes Mal, wenn Sie Daten zum Cache-Pool hinzufügen möchten, wird ein Pufferknoten neu erstellt und am Ende der verknüpften Liste eingefügt.

EventEmitterStream in NodeJs ist eine abstrakte Schnittstelle, die EventEmitter implementiert, daher werde ich EventEmitter zunächst kurz vorstellen.

EventEmitter ist eine Klasse, die Funktionen zum Veröffentlichen und Abonnieren von Ereignissen implementiert (Ein, Einmal, Aus, Emittieren), daher wird davon ausgegangen, dass sie jedem bekannt sind, daher werde ich sie nicht einzeln vorstellen.

const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() // 为 eventA 事件绑定处理函数 eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); // 为 eventB 事件绑定处理函数 eventEmitter.on('eventB', () => { console.log('eventB active 1'); }); eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // 触发 eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2
Nach dem Login kopieren

Es ist erwähnenswert, dassEventEmitterzwei Ereignisse namensnewListenerundremoveListenerhat, wenn Sie nach dem Abhören ein Ereignis zu einem Ereignisobjekt hinzufügen Die FunktionnewListener(eventEmitter.emit('newListener')) wird ausgelöst. Wenn eine Verarbeitungsfunktion entfernt wird, wirdremoveListenerauf die gleiche Weise ausgelöst.

Es ist auch zu beachten, dass die einmal gebundene Handlerfunktion nur einmal ausgeführt wird undremoveListenervor ihrer Ausführung ausgelöst wird, was bedeutet, dass der Listener aneinmalgebunden ist Die Funktion wird zunächst entfernt, bevor sie ausgelöst wird.

const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() eventEmitter.on('newListener', (event, listener)=>{ console.log('newListener', event, listener) }) eventEmitter.on('removeListener', (event, listener) => { console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)] eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)] function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)] eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)] eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2 eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]
Nach dem Login kopieren

Aber das ist nicht wichtig für das, was wir später sehen werden.

Stream

Stream ist eine abstrakte Schnittstelle zur Verarbeitung von Streaming-Daten in Node.js. Stream ist keine eigentliche Schnittstelle, sondern ein allgemeiner Begriff für alle Streams. Die eigentlichen Schnittstellen sind ReadableStream, WritableStream und ReadWriteStream.EventEmitter有两个叫做newListenerremoveListener的事件,当你向一个事件对象中添加任何事件监听函数后,都会触发newListener(eventEmitter.emit('newListener')),当一个处理函数被移除时同理会触发removeListener

还需要注意的是, once 绑定的处理函数只会执行一次,removeListener将在其执行前被触发,这意味着once

interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
Nach dem Login kopieren

Es ist ersichtlich, dass ReadableStream und WritableStream beide Schnittstellen sind, die die EventEmitter-Klasse erben (Schnittstellen in ts können Klassen erben, da es sich nur um zusammengeführte Typen handelt).

Die Implementierungsklassen, die den oben genannten Schnittstellen entsprechen, sind Readable, Writable und Duplex.

Es gibt 4 Arten von Streams in NodeJs:

  • Readable lesbarer Stream (implementiert ReadableStream)
  • Writable Stream (implementiert WritableStream)
  • Duplex lesbarer und beschreibbarer Stream (implementiert WritableStream nach der Erbung von Readable)
  • Transformationskonvertierungsstream (erbt Duplex)

Rückdruckproblem

Die Geschwindigkeit Das Schreiben von Daten auf die Festplatte ist viel geringer als die des Speichers. Wir stellen uns vor, dass es eine „Pipe“ zwischen dem Speicher und der Festplatte gibt und die „Pipe“ den „Fluss“ der Daten im Speicher darstellt Die Pipe ist sehr schnell, wenn die Pipe blockiert ist. Wenn sie voll ist, kommt es zu einem Datenrückdruck im Speicher, und die Daten sammeln sich im Speicher an und belegen Ressourcen.

Was ist ein Stream? So verstehen Sie Streams in Nodejs

Die Lösung für NodeJs Stream besteht darin, einen Float-Wert für denCache-Pool(d. h. die Schreibwarteschlange im Bild) jedes Streams festzulegen, wenn die Datenmenge diesen Float erreicht Wert, gehe zum Cache. Wenn der Pool die Daten erneutpusht, wird false zurückgegeben, was bedeutet, dass der Inhalt des Cache-Pools im aktuellen Stream den Float-Wert erreicht hat und keine weiteren Daten erwartet werden Zu diesem Zeitpunkt sollten wir die Datenproduktion sofort stoppen, um einen übergroßen Pool zu verhindern.缓存池(就是图中写入队列)设置一个浮标值,当其中数据量达到这个浮标值后,往缓存池再次push数据时就会返回 false,表示当前流中缓存池内容已经达到浮标值,不希望再有数据写入了,这时我们应该立即停止数据的生产,防止缓存池过大产生背压。

Readable

可读流(Readable)是流的一种类型,他有两种模式三种状态

两种读取模式:

  • 流动模式:数据会从底层系统读取写入到缓冲区,当缓冲区被写满后自动通过 EventEmitter 尽快的将数据传递给所注册的事件处理程序中

  • 暂停模式:在这种模式下将不会主动触发 EventEmitter 传输数据,必须显示的调用Readable.read()方法来从缓冲区中读取数据,read 会触发响应到 EventEmitter 事件。

三种状态:

  • readableFlowing === null(初始状态)

  • readableFlowing === false(暂停模式)

  • readableFlowing === true(流动模式)

初始时流的readable.readableFlowingnull

添加data事件后变为 true 。调用pause()unpipe()、或接收到背压或者添加readable事件,则readableFlowing会被设为 false ,在这个状态下,为 data 事件绑定监听器不会使 readableFlowing 切换到 true

调用resume()可以让可读流的readableFlowing

Readable

Readable Stream (Readable) ist eine Art Stream. Er hat zwei Modi und drei Zustände

Zwei Lesemodi:
      readable.readableFlowing null pause() unpipe() readable readableFlowing wird auf „false“ gesetzt.In diesem Zustand wird durch die Bindung eines Listeners an das Datenereignis readableFlowing nicht auf „true“ gesetzt. Rufen Sie resume() readableFlowing Flussmodus: Daten werden vom zugrunde liegenden System gelesen und in den Puffer geschrieben. Wenn der Puffer voll ist, werden die Daten so schnell wie möglich automatisch über EventEmitter an den registrierten Ereignishandler übergeben. Drei Zustände: readableFlowing === null (Anfangszustand) readableFlowing === false (Pausemodus) readableFlowing === true (Flow-Modus) Derdes anfänglichen Flows ist und wird nach dem Hinzufügen des Datenereignisses wahr. Wenn,aufgerufen wird, Gegendruck empfangen wird oder ein-Ereignis hinzugefügt wird,auf, um dasdes lesbaren Flusses auf true zu setzen.
      Pausemodus im Programm: In diesem Modus wird EventEmitter nicht aktiv zum Übertragen von Daten ausgelöst. Die MethodeReadable.read()muss explizit aufgerufen werden, um Daten aus dem Puffer zu lesen wird als Reaktion auf EventEmitter ausgelöst.
      Das Entfernen aller lesbaren Ereignisse ist die einzige Möglichkeit, readableFlowing auf null zu setzen.
      Ereignisname lesbar read(size)
      Beschreibung
      Wird ausgelöst, wenn neue lesbare Daten im Puffer vorhanden sind (wird für jeden in den Cache eingefügten Knoten ausgelöst). Das wird es Der Parameter wird jedes Mal ausgelöst, wenn der Stream geschlossen wird. Beschreibung:
      verbraucht Daten mit einer Länge Die Rückgabe von Null bedeutet, dass die aktuellen Daten kleiner als die Größe sind. Andernfalls werden die dieses Mal verbrauchten Daten zurückgegeben. Wenn die Größe nicht übergeben wird, bedeutet dies, dass alle Daten im Cache-Pool verbraucht werden
      const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 缓存池浮标值 }) readStreams.on('readable', () => { console.log('缓冲区满了') readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件 }) readStreams.on('data', (data) => { console.log('data') })
      Nach dem Login kopieren

      https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

      当 size 为 0 会触发 readable 事件。

      当缓存池中的数据长度达到浮标值highWaterMark后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据

      暂停状态的流如果不调用read来消费数据时,后续也不会在触发datareadable,当调用read消费时会先判断本次消费后剩余的数据长度是否低于浮标值,如果低于浮标值就会在消费前请求生产数据。这样在read后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发readable,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因

      流动状态下的流有两种情况

      • 生产速度慢于消费速度时:这种情况下每一个生产数据后一般缓存池中都不会有剩余数据,直接将本次生产的数据传递给 data 事件即可(因为没有进入缓存池,所以也不用调用 read 来消费),然后立即开始生产新数据,待上一次数据消费完后新数据才生产好,再次触发 data ,一只到流结束。
      • 生产速度快于消费速度时:此时每一次生产完数据后一般缓存池都还存在未消费的数据,这种情况一般会在消费数据时开始生产下一次消费的数据,待旧数据消费完后新数据已经生产完并且放入缓存池

      他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。

      值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null

      暂停模式

      Was ist ein Stream? So verstehen Sie Streams in Nodejs

      暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用_read方法,把数据从数据源push到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 "readable" 事件,告诉消费者有数据已经准备好了,可以继续消费。

      一般来说,'readable'事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次'readable'事件;

      消费者 "readable" 事件的处理函数中,通过stream.read(size)主动消费缓冲池中的数据。

      const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, // 参数的 read 方法会作为流的 _read 方法,用于获取源数据 read(size) { // 假设我们的源数据上 1000 个1 let chunk = null // 读取数据的过程一般是异步的,例如IO操作 setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) // 每一次成功 push 数据到缓存池后都会触发 readable myReadable.on('readable', () => { const chunk = myReadable.read()//消费当前缓存池中所有数据 console.log(chunk.toString()) })
      Nach dem Login kopieren

      值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)

      // hwm 不会大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB限制 n = MAX_HWM; } else { //取下一个2最高幂,以防止过度增加hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; } return n; }
      Nach dem Login kopieren

      流动模式

      Was ist ein Stream? So verstehen Sie Streams in Nodejs

      所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:

      • 添加 "data" 事件句柄;
      • 调用 “resume”方法;
      • 使用 "pipe" 方法把数据发送到可写流

      流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调_read方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了(push(null));

      可读流可以通过以下方式切换回暂停模式:

      • 如果没有管道目标,则调用stream.pause()
      • 如果有管道目标,则移除所有管道目标。调用stream.unpipe()可以移除多个管道目标。
      const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) myReadable.on('data', data => { console.log(data.toString()) })
      Nach dem Login kopieren

      Writable

      相对可读流来说,可写流要简单一些。

      Was ist ein Stream? So verstehen Sie Streams in Nodejs

      当生产者调用write(chunk)时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用_write,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用write(chunk)后会返回false,这时候生产者应该停止继续写入。

      那么什么时候可以继续写入呢?当缓冲中的数据都被成功_write之后,清空了缓冲队列后会触发drain事件,这时候生产者可以继续写入数据。

      当生产者需要结束写入数据时,需要调用stream.end方法通知可写流结束。

      const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// 会作为_write方法 setTimeout(()=>{ fileContent += chunk callback()// 写入结束后调用 }, 500) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()
      Nach dem Login kopieren

      注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此

      const { Writable } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log('消费', chunk.toString()) callback()// 写入结束后调用 }, 100) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) let count = 0 function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end() } } productionData() myWritable.on('drain', productionData)
      Nach dem Login kopieren

      上述是一个浮标值为10的可写流,现在数据源是一个0——20到连续的数字字符串,productionData用于写入数据。

      • 首先第一次调用myWritable.write("0")时,因为缓存池不存在数据,所以"0"不进入缓存池,而是直接交给_wirtemyWritable.write("0")返回值为true

      • 当执行myWritable.write("1")时,因为_wirtecallback还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将"1"加入缓存池中。后面2-9都是如此

      • 当执行myWritable.write("10")时,此时缓冲区长度为9(1-9),还未到达浮标值,"10"继续作为一个缓冲区加入缓存池中,此时缓存池长度变为11,所以myWritable.write("1")返回false,这意味着缓冲区的数据已经足够,我们需要等待drain事件通知时再生产数据。

      • 100ms过后,_write("0", encoding, callback)callback被调用,表明"0"已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用_read消费缓存池的头节点("1"),然后继续重复这个过程直到缓存池为空后触发drain事件,再次执行productionData

      • 调用myWritable.write("11"),触发第1步开始的过程,直到流结束。

      Duplex

      在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。

      Was ist ein Stream? So verstehen Sie Streams in Nodejs

      Duplex 流需要同时实现下面两个方法

      • 实现 _read() 方法,为可读流生产数据

      • 实现 _write() 方法,为可写流消费数据

      上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样

      以 NodeJs 的标准输入输出流为例:

      • 当我们在控制台输入数据时会触发其 data 事件,这证明他有可读流的功能,每一次用户键入回车相当于调用可读的 push 方法推送生产的数据。
      • 当我们调用其 write 方法时也可以向控制台输出内容,但是不会触发 data 事件,这说明他有可写流的功能,而且有独立的缓冲区,_write 方法的实现内容就是让控制台展示文字。
      // 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性 process.stdin.on('data', data=>{ process.stdin.write(data); }) // 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data setInterval(()=>{ process.stdin.write('不是用户控制台输入的数据') }, 1000)
      Nach dem Login kopieren

      Transform

      Was ist ein Stream? So verstehen Sie Streams in Nodejs

      可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。

      Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|
      Nach dem Login kopieren

      Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。

      Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------
      Nach dem Login kopieren

      对于创建 Transform 流,最重要的是要实现_transform方法而不是_write或者_read_transform中对可写流写入的数据做处理(消费)然后为可读流生产数据。

      转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
      Nach dem Login kopieren
      const { write } = require('fs') const { Transform, PassThrough } = require('stream') const reurce = '1312123213124341234213423428354816273513461891468186499126412' const transform = new Transform({ highWaterMark: 10, transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池 this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){// end触发前执行 this.push('<<<') callback() } }) // write 不断写入数据 let count = 0 transform.write('>>>') function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() } } productionData() transform.on('drain', productionData) let result = '' transform.on('data', data=>{ result += data.toString() }) transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })
      Nach dem Login kopieren

      Pipe

      管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。

      Was ist ein Stream? So verstehen Sie Streams in Nodejs

      管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。

      对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。

      https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33

      Stream.prototype.pipe = function(dest, options) { const source = this; function ondata(chunk) { if (dest.writable && dest.write(chunk) === false && source.pause) { source.pause(); } } source.on('data', ondata); function ondrain() { if (source.readable && source.resume) { source.resume(); } } dest.on('drain', ondrain); // ...后面的代码省略 }
      Nach dem Login kopieren

      pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。

      当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。

      我们实现一个将 data 文件中所有1替换为@然后输出到 result 文件到管道。

      const { Transform } = require('stream') const { createReadStream, createWriteStream } = require('fs') // 一个位于管道中的转换流 function createTransformStream(){ return new Transform({ transform(chunk, encoding, callback){ this.push(chunk.toString().replace(/1/g, '@')) callback() } }) } createReadStream('./data') .pipe(createTransformStream()) .pipe(createWriteStream('./result'))
      Nach dem Login kopieren

      在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。

      原文地址:https://juejin.cn/post/7077511716564631566

      作者:月夕

      更多node相关知识,请访问:nodejs 教程

      Das obige ist der detaillierte Inhalt vonWas ist ein Stream? So verstehen Sie Streams in Nodejs. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Verwandte Etiketten:
    Quelle:juejin.cn
    Erklärung dieser Website
    Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
    Neueste Downloads
    Mehr>
    Web-Effekte
    Quellcode der Website
    Website-Materialien
    Frontend-Vorlage
    Über uns Haftungsausschluss Sitemap
    Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!