이 글은 주로 Node.js 파이프의 소스 코드 분석을 소개합니다. 편집자는 이것이 꽤 좋다고 생각합니다. 이제 여러분과 공유하고 참고할 것입니다. 편집자를 따라가서 살펴보겠습니다
이전 두 기사에서 우리는 배웠습니다. Readable 데이터를 Writable에 쓰려면 먼저 수동으로 데이터를 메모리로 읽은 다음 Writable에 써야 합니다. 즉, 데이터를 전달할 때마다 다음 템플릿 코드를 작성해야 합니다.
이제 어떻게 구현되었는지 살펴보겠습니다먼저 Readable의 Pipe() 메소드를 호출해야 합니다
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
pipe() 함수를 실행할 때 먼저 Writable to를 기록합니다. .pipes를 상태화한 다음 관련 이벤트를 바인딩합니다. 마지막으로 Readable이 흐름 모드가 아닌 경우 이력서()를 호출하여 Readable을 흐름 모드로 변경합니다.
데이터 전달Readable이 데이터에서 데이터를 얻은 후. 소스에서 데이터 이벤트를 발생시키고 ondata()
readable.pipe(writable)
를 통해 Writable에 데이터를 씁니다. 시간이 지나면 _write() 내부에 src.push(chunk)를 호출하거나 파이프를 해제할 수 있으며 이로 인해 waitDrain이 여러 번 증가하고 지울 수 없으며 Readable이 중단됩니다
Writable, Readable에 더 이상 데이터를 쓸 수 없는 경우 모든 배수 이벤트가
드레인 이벤트를 트리거하고 ondrain()
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 记录 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 触发 Writable 的 pipe 事件 dest.emit('pipe', src); // 将 Readable 改为 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
을 실행할 때까지 일시 중지 모드로 들어갑니다. 각 배수 이벤트가 트리거되면 waitDrain은 waitDrain이 0이 될 때까지 감소됩니다. 이때 Readable을 flow 모드로 진입시키기 위한 call flow(src)
이 시점에서 전체 데이터 전송 주기가 성립되었으며, 모든 데이터 쓰기가 완료될 때까지 해당 주기를 따라 데이터가 지속적으로 Writable로 흘러 들어갑니다.
unpipe// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 进入 pause 模式 src.pause(); } }
Readable.prototype.unpipe() 함수는 state.pipes 속성과 대상을 기반으로 실행 전략을 선택합니다. 매개변수. 마지막으로 dest의 unpipe 이벤트가 트리거됩니다. unpipe 이벤트가 트리거된 후 관련 데이터를 정리하기 위해 onunpipe()가 호출됩니다. 데이터 전송, 언파이프 및 예외 처리를 포함하여 Writable은 수동적 당사자입니다(트리거만 필요함). 배수 이벤트)파이프라인 프로세스를 요약하려면:
먼저 readbable.pipe(writable)를 실행하여 읽기 가능 및 쓰기 가능을 연결합니다
writable.write(chunk)가 false를 반환하면 일시 중지 모드로 들어가서 드레인 이벤트가 트리거될 때까지 기다립니다
모든 드레인 이벤트가 트리거된 후 다시 플로우 모드로 들어가 데이터를 씁니다
데이터 쓰기가 완료되거나 중단이 발생하더라도 마지막에는 unpipe()가 호출됩니다
unpipe() 및 Readable.prototype.unpipe()가 호출되어 dest cleans의 unpipe 이벤트가 트리거됩니다. 관련 데이터
위 내용은 Node.js 파이프() 메소드 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!