Node.jsのpipe()メソッドの紹介

巴扎黑
リリース: 2017-08-15 10:14:52
オリジナル
13437 人が閲覧しました

この記事では主に Node.js パイプのソースコード分析を紹介します。編集者がそれを紹介し、参考にさせていただきます。編集者をフォローして見てみましょう

前の 2 つの記事から、私たちは学びました。 Readable データを Writable に書き込む場合は、まず手動でデータをメモリに読み取り、それから Writable に書き込む必要があります。つまり、データを渡すたびに、次のテンプレート コードを記述する必要があります


readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})
ログイン後にコピー

使いやすいように、Node.js には Pipe() メソッドが用意されており、データをエレガントに渡すことができます


readable.pipe(writable)
ログイン後にコピー

それでは、その実装方法を見てみましょう

pipe

まず、Readableのpipe()メソッドを呼び出す必要があります


// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};
ログイン後にコピー

pipe()関数を実行するときは、まずWritableを記録します最後に、Readable がフロー モードでない場合は、Readable がデータ ソースからデータを取得した後、resume() を呼び出してデータを渡します

。 、データイベントをトリガーし、ondata()

ondata() 関連コードを実行します:

// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) && 
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 进入 pause 模式
   src.pause();
  }
 }
ログイン後にコピー

ondata(chunk) 関数では、dest.write(chunk) を通じてデータを Writable に書き込みます

この時点で_write() 内に Call src.push(chunk) または unpipe がある可能性があります。これにより、awaitDrain が複数回増加し、クリアできなくなり、Readable がスタックします

これ以上データを Writable に書き込めなくなると、Readable はすべてのドレイン イベントがトリガーされるまで一時停止モードに入ります

ドレイン イベントをトリガーし、ondrain() を実行します

// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
  return function() {
   var state = src._readableState;
   debug('pipeOnDrain', state.awaitDrain);
   if (state.awaitDrain)
    state.awaitDrain--;
   // awaitDrain === 0,且有 data 监听器
   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
    state.flowing = true;
    flow(src);
   }
  };
 }
ログイン後にコピー

各ドレイン イベントがトリガーされると、awaitDrain が 0 になるまで awaitDrain が減らされます。このとき、flow(src)を呼び出して、Readableをフローモードにします

この時点で、データ転送サイクル全体が確立され、すべてのデータの書き込みが完了するまで、データはサイクルに沿って継続的にWritableに流れ込みます。

unpipe

書き込みプロセス中にエラーが発生したかどうかに関係なく、unpipe()

// lib/_stream_readable.js

// ...

 function unpipe() {
  debug('unpipe');
  src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也没有
 if (state.pipesCount === 0)
  return this;

 // 只有一个
 if (state.pipesCount === 1) {
  if (dest && dest !== state.pipes)
   return this;
  // 没有指定就 unpipe 所有
  if (!dest)
   dest = state.pipes;

  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;
  if (dest)
   dest.emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 没有指定就 unpipe 所有
 if (!dest) {
  var dests = state.pipes;
  var len = state.pipesCount;
  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;

  for (var i = 0; i < len; i++)
   dests[i].emit(&#39;unpipe&#39;, this, unpipeInfo);
  return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
  return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
  state.pipes = state.pipes[0];

 dest.emit(&#39;unpipe&#39;, this, unpipeInfo);

 return this;
};
ログイン後にコピー

Readable.prototype.unpipe() 関数は、state.pipes 属性と dest に基づいて実行戦略を選択します。パラメータ。最後に、dest の unpipe イベントがトリガーされ、onunpipe() が呼び出されて関連データがクリーンアップされます (データ転送、アンパイプ、例外処理を含む)。Writable は受動的なパーティーです (トリガーする必要があるだけです)。ドレイン イベント)

パイプライン プロセスを要約すると:

最初に readbable.pipe(writable) を実行して、読み取り可能と書き込み可能を接続します


読み取り可能にデータがある場合は、readable.emit('data') が書き込みますwritable.write(chunk) が false を返した場合は、一時停止モードに入り、ドレイン イベントがトリガーされるのを待ちます

すべてのドレイン イベントがトリガーされた後、再びフロー モードに入り、データを書き込みます

    データの書き込みが完了しても、中断が発生しても、最後には unpipe() が呼び出されます
  • unpipe() と Readable.prototype.unpipe() が呼び出され、dest の unpipe イベント、クリーンアップ関連のイベントがトリガーされますデータ

以上がNode.jsのpipe()メソッドの紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート