Maison > interface Web > js tutoriel > Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

青灯夜游
Libérer: 2021-07-08 10:06:38
avant
2919 Les gens l'ont consulté

Cet article vous présentera le flux dans Nodejs et verra comment les flux lisibles par Node sont implémentés. Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer. J'espère qu'il sera utile à tout le monde.

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

Le concept de flux

Stream est une interface abstraite pour traiter les données en streaming dans Node.js. Le module stream est utilisé pour construire des objets qui implémentent l'interface stream. [Apprentissage recommandé : "Tutoriel nodejs"]

Le rôle du flux

Dans le processus de lecture et d'écriture de fichiers volumineux, il ne sera pas être lu en une seule fois et écrit en mémoire. Vous pouvez contrôler le nombre de lectures et d'écritures à chaque fois

Classification des flux

1 Flux lisible-Readable

Exemple : fs.createReadStream ;

Emplacement du code source : lib/_stream_readable.js

2. Writable stream-Writable

Exemple : fs.createWriteStream; /_stream_writable .js

3. Duplex stream-Duplex : répond aux fonctions de lecture et d'écriture

Exemple : net.Socket();

Emplacement du code source : lib/ _stream_duplex.js

4. Transform stream-Transform : Objectif : compression, transcodage

Exemple :

const { Transform } = require('stream');
Transform.call(this, '要转换的数据');//具体的使用详情 见node官网
Copier après la connexion

-Emplacement du code source : lib/_stream_tranform.js

Le processus de lecture des fichiers avec des flux lisibles

Le processus de lecture du code du fichier
  • const path = require("path");
    const aPath = path.join(__dirname, "a.txt");//需要读取的文件
    const fs = require("fs");
    let rs = fs.createReadStream(aPath, {
      flags: "r",
      encoding: null,//默认编码格式是buffer,深挖buffer又要学习字符编码,留个坑 到时候写一个编码规范的学习整理
      autoClose: true,//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
      start: 0,
      highWaterMark: 3,//每次读取的个数 默认是64*1024个字节
    });
    
    rs.on("open", function (fd) {
      // fd  number类型
      console.log("fd", fd);
    });
    // 他会监听用户,绑定了data事件,就会触发对应的回调,不停的触发
    rs.on("data", function (chunk) {
    //这里会打印的是ascII 值 ,所以可以toString查看详情自己看得懂的样子
      console.log({ chunk }, "chunk.toString", chunk.toString()); 
      //如果想每一段事件 读一点 可以用rs.pause() 做暂停,然后计时器 里rs.resume()再次触发data事件
      rs.pause();//暂停读取
    });
    rs.on("close", function () {
      //当文件读取完毕后 会 触发 end事件
      console.log("close");
    });
    setInterval(() => {
      rs.resume(); //再次触发data,直到读完数据为止
    }, 1000);
    Copier après la connexion
Hors sujet : je veux parler des flux de fichiers et des flux lisibles ordinaires La différence entre les flux de lecture
  • 1 L'ouverture et la fermeture sont uniques aux flux de fichiers. La prise en charge de l'ouverture et de la fermeture est un flux de fichiers

2. les flux lisibles l'ont (on('data '), on('end'), on('error'), curriculum vitae, pause ; donc tant que ces méthodes sont prises en charge, c'est un flux lisible

Le processus d'écriture d'un fichier par un flux inscriptible

Processus d'écriture de code de fichier
  • const fs = require("fs");
    const path = require("path");
    const bPath = path.join(__dirname, "b.txt");
    let ws = fs.createWriteStream(bPath, {
    //参数和可读流的类似
      flags: "w",
      encoding: "utf-8",
      autoClose: true,
      start: 0,
      highWaterMark: 3,
    });
    ws.on("open", function (fd) {
      console.log("open", fd);
    });
    ws.on("close", function () {
      console.log("close");
    });
    
    //write的参数string 或者buffer,ws.write 还有一个boolea的返回值表示是真实写入文件还是放入缓存中
    ws.write("1");
    let flag = ws.write("1");
    console.log({ flag });//true
    flag = ws.write("1");
    console.log({ flag });//true
    flag = ws.write("1");
    console.log({ flag });//false
    Copier après la connexion
Processus d'écriture et de lecture de flux duplex

Écrire Un exemple de service local
  • 1 Le serveur (code du serveur) implémente
const net = require("net"); //net 模块是 node自己封装的tcp层
//socket 就是双工流 能读能写  http源码就是用net模块写的 基于tcp
const server = net.createServer(function (socket) {
  socket.on("data", function (data) {//监听客户端发来的消息
    console.log(data.toString)
    socket.write("server:hello");//写入server:hello
  });
  socket.on("end", function () {
    console.log("客户端关闭");
  });
});
server.on("err", function (err) {
  console.log(err);
});
server.listen(8080);//服务端监听8080端口
Copier après la connexion

2. >

const net = require("net"); //net 模块是 node自己封装的tcp层
const socket = new net.Socket(); //
socket.connect(8080, "localhost"); //  表示链接服务器本地8080端口
socket.on("connect", function (data) {
  //和服务器建立链接后
  socket.write("connect server");
});
socket.on("data", function (data) {
  //监听数据,读取服务器传来的数据
  console.log(data.toString());
  socket.destroy()
});
socket.write('ok')
socket.on("error", function (err) {
  console.log(err);
});
Copier après la connexion

3. Si vous souhaitez faire une parenthèse Pour voir la poignée de main à trois voies et la vague à quatre voies de TCP, vous pouvez utiliser Wireshark (un outil de capture de paquets) pour voir le processus réel à travers mon code ci-dessus

Processus de transformation du flux de conversion

Le flux de conversion est un type de flux duplex qui permet l'entrée et renvoie la sortie après avoir effectué certaines opérations sur les données <🎜. >

Processus de code (

Ma référence pour cet exemple

)
  • const stream = require(&#39;stream&#39;)
    let c = 0;
    const readable = stream.Readable({
      highWaterMark: 2,
      read: function () {
        let data = c < 26 ? Number(c++ + 97) : null;
        console.log(&#39;push&#39;, data);
        this.push( String.fromCharCode(data));
    }
    })
    
    const transform = stream.Transform({
      highWaterMark: 2,
      transform: function (buf, enc, next) {
        console.log(&#39;transform&#39;, buf.toString());
        next(null, buf);
      }
    })
    
    readable.pipe(transform);
    Copier après la connexion
    Imprimer les résultats

Mise en œuvre des flux lisiblesUne brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

Suivez les points d'arrêt pour d'abord comprendre le processus d'appel du flux lisible

Le le code précédent pour le processus de lecture du fichier de flux lisible est utilisé comme exemple de point d'arrêt

rs.on('open') est l'entrée du point d'arrêt à saisir

rs.on(&#39;open&#39;)

1. Via Stream.prototype, on.call hérite de la classe Stream

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?Emplacement du fichier source : pas de dlib/_stream_readable.js (j'ai regardé ici directement via les points d'arrêt, mais Je ne l'ai pas trouvé)

Cliquez à nouveau et découvrez que Stream est une sous-classe d'EventEmitter, de sorte que le flux lisible peut également prendre en charge la publication et l'abonnementUne brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

2. Le type d'événement est-il surveillé Soit des données, soit lisibles, ce n'est pas un auditeur qui continue jusqu'à l'événement suivant

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

rs.on(&#39;data&#39;)

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?La partie données fait deux choses

    1 Déterminer si elle coule (la valeur par défaut est nulle. ) n'est pas faux, puis reprenez automatiquement la méthode et continuez la lecture du fichier (dans mon cas, ici c'est rs.pause(); définissez manuellement la valeur fluide sur false pour qu'elle ne continue pas à être appelée)
  • 2 . Si je n'appelle pas rs.pause(), je continuerai à appeler CV. Voir ce qui se fait dans CV

2.1 Enfin, stream.read() est appelé pour continuer la lecture du fichier ; jusqu'à ce que le fichier soit lu, allez à la fin de l'émission et fermez les événements dans l'ordre

Une brève discussion sur les flux lisibles dans Nodejs. Comment implémenter des flux lisibles ?

Résumé : Donc, par défaut, les données continueront à lire le fichier. fichier jusqu'à ce que le fichier soit lu. Si vous souhaitez que la lecture du fichier soit contrôlable, vous pouvez utiliser rs.pause()

vous-même comme moi. >

Idées de mise en œuvre

继承EventEmitter发布订阅管理我们的事件

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {

}
module.exports = ReadStream;
Copier après la connexion

数据初始化

constructor(path, options = {}) {
    super();
    //参考fs 写实例需要用到的参数
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;//默认编码格式是buffer
    this.autoClose = options.autoClose || true;//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
    this.start = options.start || 0;//数据读取的开始位置
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//默认一次读取64个字节的数据 
    this.offset = this.start;//fs.read的偏移量
    this.fd = undefined; //初始化fd 用于 open成功后的fd做赋值  供 read里使用
    this.flowing = false;//实现pause和resume备用,设置flag,当监听到data事件的时候 改 flowing为true,
    this.open(); //初始化的时候就要调用open
    this.on("readStreamListener", function (type) {
      // console.log(type)//这里打印就能看到 实例上所有 通过on 绑定的事件名称
      if (type === "data") {
      //监听到data事件的时候 改 flowing为true
        this.flowing = true;
        this.read();
      }
    });
    }
Copier après la connexion

文件读取方法read,pause,resume,open和destroy的实现

open()

 open() {
 // 调用fs.open 读取目标文件 
    fs.open(this.path, this.flags, (err, fd) => { 
      this.fd = fd; //赋值一个fd 供后面的 read()方式使用,文件读取成功,fd是返回一个数字
      this.emit("open", fd);
    });
Copier après la connexion

read()

 read() {
   // console.log("一开始read里的", this.fd); //但是这样依旧拿不到 open后的fd,用 发布订阅 通过on来获取 绑定的事件type
    //这里要做一个容错处理 ,因为open是异步读取文件,read里无法马上拿到open结果
  if (typeof this.fd !== "number") {
      //订阅open,给绑定一个回调事件read 直到this.fd有值
      return this.once("open", () => this.read());
    }
 }
  //fd打开后 调用fs.read
  //实例上的start值是未知number,存在实际剩余的可读的文件大小<highWaterMar的情况 ,用howMuchToRead 替换highWaterMark 去做fs.read的每次读取buffer的大小
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
  //定义一个用户 传进来的highWaterMark 大小的buffer对象
    const buffer = Buffer.alloc(this.highWaterMark);
       //读取文件中的内容fd给buffer 从0位置开始,每次读取howMuchToRead个。插入数据,同时更新偏移量
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          // 每读完一次,偏移量=已经读到的数量
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          //写到这里实例上的data 已经可以打印出数据了 但是 继续读取 调用this.read() 直到bytesRead不存在 说明数据读取完毕了 走else
          //回调 this.read();时候判断 this.flowing 是否为true
          //pause调用后this.flowing将为false
          if (this.flowing) {
            this.read();
          }
        } else {
          // 执行到这 bytesRead不存在说明  文件数据读取完毕了已经 触发end
          this.emit("end");//emit 实例上绑定的end事件
          //destroy 还没写到 稍等 马上后面就实现...
          this.destroy();
        }
      }
    );
Copier après la connexion

resume()

文件读取不去data事件,会触发对应的回调,不停的触发 所以想要变可控可以手动调用 resume()& pause()

  • pause的实现,调用的时候设置 this.flowing=false,打断 read()
  pause() {
    this.flowing = false;
  }
Copier après la connexion

pause()

  • pause 打断 read()多次读取,可以使用resume 打开 this.flowing=true 并调用read
resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
Copier après la connexion

destroy()

  • 文件open不成功时候抛错时调用
  • 文件读取完毕后&&this.autoClose===true ,read()里文件读取end的时候 就执行close
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    // 把close放destroy里 并 在read里调用
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
Copier après la connexion

完整代码

  • 实现代码
/**
 *实现简单的可读流
 */

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    //参考fs 写实例需要用到的参数
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.fd = undefined;
    this.offset = this.start;
    this.flowing = false;
    this.open(); 
    this.on("newListener", function (type) {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.destroy(err);
      }
      this.fd = fd;
      this.emit("open", fd);
    });
  }
  resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
  pause() {
    this.flowing = false;
  }

  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read());
    }
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
    const buffer = Buffer.alloc(this.highWaterMark);
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
            this.read();
          }
        } else {
          this.emit("end");
          this.destroy();
        }
      }
    );
  }
}

module.exports = ReadStream;
Copier après la connexion
  • 调用代码
const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
  flags: "r",
  encoding: null, //默认编码格式是buffer
  autoClose: true, //相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
  start: 0,
  highWaterMark: 3, //每次读取的个数 默认是64*1024个字节
});
Copier après la connexion

可写流的实现

待续...

更多编程相关知识,请访问:编程视频!!

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal