1. はじめに
この記事では、node.js ストリームを使用したプログラム開発の基本的な方法を紹介します。
<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also." Doug McIlroy. October 11, 1964</code>
Stream に最初に触れたのは、Unix の初期の頃でした。数十年にわたる実践により、Stream のアイデアがいくつかの巨大なシステムを簡単に開発できることが証明されました。 UNIX では、Stream はノード内の |; を介して実装され、組み込みのストリーム モジュールとして、多くのコア モジュールとサードパーティ モジュールが使用されます。 Unix と同様に、ノード Stream の主な操作も .pipe() です。ユーザーは、アンチプレッシャー メカニズムを使用して読み取りと書き込みのバランスを制御できます。
Stream は、再利用可能な統合インターフェイスを開発者に提供し、抽象的な Stream インターフェイスを通じてストリーム間の読み取りと書き込みのバランスを制御できます。
2. ストリームを使用する理由
ノードの I/O は非同期であるため、ディスクとネットワークへの読み取りと書き込みにはデータを読み取るためのコールバック関数が必要です。以下はファイル ダウンロード サーバーの簡単なコードです:
<code class="hljs javascript">var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(8000);</code>
これらのコードは必要な機能を実現できますが、「data.txt」ファイルが大きく、同時実行の量が多い場合、サービスはファイル データ全体をメモリにキャッシュする必要があります。メモリが無駄になります。ユーザーはファイル データを受け入れる前にファイル全体がメモリにキャッシュされるまで待つ必要があるため、ユーザー エクスペリエンスはかなり低下します。しかし幸いなことに、両方のパラメータ (req、res) は Stream なので、 fs.readFile():
の代わりに fs.createReadStream() を使用できます。<code class="hljs javascript">var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000);</code>
.pipe() メソッドは fs.createReadStream() の「data」イベントと「end」イベントをリッスンするため、「data.txt」ファイルはファイル全体をキャッシュする必要がなく、データ ブロックはクライアント接続が完了した直後に送信されます。 .pipe() を使用するもう 1 つの利点は、クライアントの遅延が非常に大きい場合に発生する読み取りと書き込みの不均衡の問題を解決できることです。送信する前にファイルを圧縮したい場合は、サードパーティのモジュールを使用できます:
<code class="hljs javascript">var http = require('http'); var fs = require('fs'); var oppressor = require('oppressor'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(oppressor(req)).pipe(res); }); server.listen(8000);</code>
この方法では、ファイルは gzip と deflate をサポートするブラウザによって圧縮されます。 oppressor モジュールはすべてのコンテンツ エンコーディングを処理します。
Stream によりプログラムの開発が簡単になります。
3. 基本概念
読み取り可能、書き込み可能、変換、二重、および「クラシック」という 5 つの基本的なストリームがあります。
3-1、パイプ
すべてのタイプのストリーム コレクションは、次のように .pipe() を使用して入出力ペアを作成し、読み取り可能なストリーム src を受け取り、そのデータを書き込み可能なストリーム dst に出力します。
<code class="hljs perl">src.pipe(dst)</code>
.pipe(dst) メソッドは dst ストリームを返すため、次のように複数の .pipe() を連続して使用できます。
<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>
<code class="hljs perl">a.pipe( b ); b.pipe( c ); c.pipe( d );</code>
3-2、読み取り可能なストリーム
Readable ストリームの .pipe() メソッドを呼び出すことにより、Readable ストリームのデータを Writable、Transform、または Duplex ストリームに書き込むことができます。
<code class="hljs perl">readableStream.pipe( dst )</code>
ここでは、読み取り可能なストリームを作成します!
<code class="hljs perl">var Readable = require('stream').Readable; var rs = new Readable; rs.push('beep '); rs.push('boop\n'); rs.push(null); rs.pipe(process.stdout); $ node read0.js beep boop </code>
すべてのデータ コンテンツを読み取り可能なストリームにプッシュする前に rs.pipe(process.stdout); を呼び出していないことに注意してください。これは、読み取り可能なストリームがすべてプッシュされたためです。データは、受信者がデータを読み取るまでキャッシュされます。ただし、多くの場合、データ全体をキャッシュするのではなく、データを受信したときにのみ読み取り可能なストリームにデータをプッシュする方が適切です。 ._read() 関数を書き直してみましょう:
<code class="hljs javascript">var Readable = require('stream').Readable; var rs = Readable(); var c = 97; rs._read = function () { rs.push(String.fromCharCode(c++)); if (c > 'z'.charCodeAt(0)) rs.push(null); }; rs.pipe(process.stdout);</code> <code class="hljs bash">$ node read1.js abcdefghijklmnopqrstuvwxyz</code>
util.inherits() を使用して読み取り可能なストリームを継承することもできることに注意してください。 _read() メソッドがデータ受信者がデータを要求したときにのみ呼び出されることを示すために、次のように、読み取り可能なストリームにデータをプッシュするときに遅延を設けます。
<code class="hljs javascript">var Readable = require('stream').Readable; var rs = Readable(); var c = 97 - 1; rs._read = function () { if (c >= 'z'.charCodeAt(0)) return rs.push(null); setTimeout(function () { rs.push(String.fromCharCode(++c)); }, 100); }; rs.pipe(process.stdout); process.on('exit', function () { console.error('\n_read() called ' + (c - 97) + ' times'); }); process.stdout.on('error', process.exit);</code>
<code class="hljs bash">$ node read2.js | head -c5 abcde _read() called 5 times</code>
ほとんどの場合、パイプ メソッドを使用して、読み取り可能なストリームから別の形式のストリームにデータをリダイレクトしますが、場合によっては、読み取り可能なストリームから直接データを読み取る方が便利な場合もあります。以下のように:
当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:
<code class="hljs javascript">process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); });</code>
如下运行程序发现,输出结果并不完全!
<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js <buffer 61="" 62="" 63=""> <buffer 0a="" 64="" 65=""> <buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>
这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。
<code class="hljs javascript">process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); process.stdin.read(0); });</code>
这次运行结果如下:
<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js <buffer 0a="" 64="" 65=""> <buffer 0a="" 68="" 69=""></buffer></buffer></code>
我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:
<code class="hljs javascript">var offset = 0; process.stdin.on('readable', function () { var buf = process.stdin.read(); if (!buf) return; for (; offset < buf.length; offset++) { if (buf[offset] === 0x0a) { console.dir(buf.slice(0, offset).toString()); buf = buf.slice(offset + 1); offset = 0; process.stdin.unshift(buf); return; } } process.stdin.unshift(buf); }); $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 'hearties' 'heartiest' 'heartily' 'heartiness' 'heartiness\'s' 'heartland' 'heartland\'s' 'heartlands' 'heartless' 'heartlessly'</code>
当然,有很多模块可以实现这个功能,如:split 。
3-3、writable streams
writable streams只可以作为.pipe()函数的目的参数。如下代码:
<code class="hljs perl">src.pipe( writableStream );</code>
1>创建 writable stream
重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。
<code class="hljs php">var Writable = require('stream').Writable; var ws = Writable(); ws._write = function (chunk, enc, next) { console.dir(chunk); next(); }; process.stdin.pipe(ws); $ (echo beep; sleep 1; echo boop) | node write0.js <buffer 0a="" 62="" 65="" 70=""> <buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>
第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream
<code class="hljs css">Writable({ objectMode: true })</code>
2>写数据到 writable stream
调用writable stream的.write(data)方法即可完成数据写入。
<code class="hljs vala">process.stdout.write('beep boop\n');</code>
调用.end()方法通知writable stream 数据已经写入完成。
<code class="hljs javascript">var fs = require('fs'); var ws = fs.createWriteStream('message.txt'); ws.write('beep '); setTimeout(function () { ws.end('boop\n'); }, 1000); $ node writing1.js $ cat message.txt beep boop</code>
如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。
3-4、classic streams
Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。
1>classic readable streams
Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:
<code class="hljs javascript">var Stream = require('stream'); var stream = new Stream; stream.readable = true; var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout); $ node classic0.js ABCDEFGHIJ</code>
如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:
<code class="hljs php">process.stdin.on('data', function (buf) { console.log(buf); }); process.stdin.on('end', function () { console.log('__END__'); }); $ (echo beep; sleep 1; echo boop) | node classic1.js <buffer 0a="" 62="" 65="" 70=""> <buffer 0a="" 62="" 6f="" 70=""> __END__</buffer></buffer></code>
需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:
<code class="hljs php">var through = require('through'); process.stdin.pipe(through(write, end)); function write (buf) { console.log(buf); } function end () { console.log('__END__'); } $ (echo beep; sleep 1; echo boop) | node through.js <buffer 0a="" 62="" 65="" 70=""> <buffer 0a="" 62="" 6f="" 70=""> __END__</buffer></buffer></code>
或者也可以使用concat-stream来缓存整个流的内容:
<code class="hljs oxygene">var concat = require('concat-stream'); process.stdin.pipe(concat(function (body) { console.log(JSON.parse(body)); })); $ echo '{"beep":"boop"}' | node concat.js { beep: 'boop' }</code>
当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。
2>classic writable streams
Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。
4、transform
transform是一个对读入数据过滤然输出的流。
5、duplex
duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:
<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>
以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!