Heim > Web-Frontend > js-Tutorial > Hauptteil

Erfahren Sie in einem Artikel mehr über das Multiprozessmodell in Node.js

青灯夜游
Freigeben: 2021-10-20 10:00:26
nach vorne
1671 Leute haben es durchsucht

Dieser Artikel stellt Ihnen den Multiprozess in Node.js vor und erfährt mehr über das Cluster-Modul und das Egg.js-Multiprozessmodell. Ich hoffe, er wird Ihnen hilfreich sein!

Erfahren Sie in einem Artikel mehr über das Multiprozessmodell in Node.js

Wie wir alle wissen, wird JS in einem einzelnen Thread ausgeführt und die gesamte Asynchronisierung wird durch die Ereignisschleife abgeschlossen. Wenn ein Webdienst nur einen Thread hat, wie können die inaktiven Ressourcen der Maschine vollständig genutzt werden? oder Behälter? Wenn der Code abstürzt und keine Ausnahme abgefangen wird, wird der Thread gleichzeitig beendet. Wie stellt der auf Node.js basierende Webdienst die Robustheit der gesamten Anwendung sicher?

Cluster-Modul

Node.js bietet das Cluster-Modul zur Lösung der oben genannten Probleme. Mit diesem Modul können Entwickler einen Cluster erstellen, um die Ressourcen der Maschine oder des Containers vollständig zu nutzen Gleichzeitig ermöglicht dieses Modul mehrere Unterprozesse, die denselben Port abhören. [Empfohlenes Lernen: „nodejs-Tutorial“]

Beispiel

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on(&#39;exit&#39;, function(worker, code, signal) {
    console.log(&#39;worker &#39; + worker.process.pid + &#39; died&#39;);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}
Nach dem Login kopieren

Der Prozess der Erstellung eines untergeordneten Prozesses durch Code-Analyse

Beginnen Sie zunächst mit const cluster = require('cluster')</ Code> Ab , importiert diese Codezeile das Cluster-Modul von Node, aber innerhalb von Node sind die vom Master-Prozess und vom Worker-Prozess eingeführten Dateien unterschiedlich. Weitere Informationen finden Sie im folgenden Code: <code>const cluster = require(&#39;cluster&#39;) 说起,这行代码导入了 Node 的 Cluster 模块,而在 Node 内部,Master 进程与 Worker 进程引入的文件却不一样,详情见如下代码:

&#39;use strict&#39;;

const childOrPrimary = &#39;NODE_UNIQUE_ID&#39; in process.env ? &#39;child&#39; : &#39;master&#39;;
module.exports = require(`internal/cluster/${childOrPrimary}`);
Nach dem Login kopieren

不同的文件意味着两种进程在执行中的表现也不一样,例如:

// internal/cluster/master.js
cluster.isWorker = false;
cluster.isMaster = true;

// internal/cluster/child.js
cluster.isWorker = true;
cluster.isMaster = false;
Nach dem Login kopieren

这也是为什么 Cluster 模块到处的变量能区分不同类型进程的原因,接下来让我们分别从主、子进程两个方向去了解具体的过程

主进程

在上述代码里,Master 进程并没有做太多事情,只是根据 CPU 数量去 fork 子进程,那么我们深入到源代码里大致来看一下,相关描述均在代码的注释内

// lib/internal/cluster/master.js

// 初始化cluster
const cluster = new EventEmitter();
// 创建监听地址与server对应的map
const handles = new SafeMap();
// 初始化
cluster.isWorker = false;
cluster.isMaster = true;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.

// 自增的子进程id
let ids = 0;

// 向cluster添加fork方法
cluster.fork = function(env) {
  // 初始化cluster.settings
  cluster.setupMaster();
  // 为当前fork的子进程生成当前cluster内的唯一id
  const id = ++ids;
  // 创建子进程
  const workerProcess = createWorkerProcess(id, env);
  // 创建对应的worker实例
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 省略一些worker的事件监听....

  // 监听内部消息事件,并交由onmessage处理
  worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  // cluster发出fork事件
  process.nextTick(emitForkNT, worker);
  // 将worker实例放在cluster.workers中维护
  cluster.workers[worker.id] = worker;
  // 返回worker
  return worker;
};

// 创建子进程函数
function createWorkerProcess(id, env) {
  // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  // 执行参数
  const execArgv = [...cluster.settings.execArgv];

  // 省略debug模式相关逻辑...

  // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

// 内部消息事件处理函数
function onmessage(message, handle) {
  const worker = this;

  if (message.act === &#39;online&#39;)
    online(worker);
  // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server
  else if (message.act === &#39;queryServer&#39;)
    queryServer(worker, message);
  else if (message.act === &#39;listening&#39;)
    listening(worker, message);
  else if (message.act === &#39;exitedAfterDisconnect&#39;)
    exitedAfterDisconnect(worker, message);
  else if (message.act === &#39;close&#39;)
    close(worker, message);
}

// 获取server
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  // 创建当前子进程监听地址信息的key
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  // 在handles map中查询是否有已经创建好的该监听地址的server
  let handle = handles.get(key);

  // 没有对应的server则进行创建
  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === &#39;string&#39; &&
        process.platform !== &#39;win32&#39;) {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // 主、子进程处理连接的方式,默认为轮询
    let constructor = RoundRobinHandle;
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it&#39;s connectionless. There is nothing to send to
    // the workers except raw datagrams and that&#39;s pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === &#39;udp4&#39; ||
        message.addressType === &#39;udp6&#39;) {
      constructor = SharedHandle;
    }

    // 将监听地址信息传入构造函数创建监听实例
    handle = new constructor(key, address, message);
    // 缓存监听实例
    handles.set(key, handle);
  }

  // 向server添加自定义信息,用于server发出listening事件后透传到worker
  if (!handle.data)
    handle.data = message.data;

  // 添加server发出listening事件后的回调函数通知子进程
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}
Nach dem Login kopieren
// lib/internal/cluster/round_robin_handle.js

// 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息
function RoundRobinHandle(key, address, { port, fd, flags }) {
  // 初始化handle
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  // 监听文件描述符,不讨论
  if (fd >= 0)
    this.server.listen({ fd });
  // 监听ip:port
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  // 监听UNIX socket,不讨论
  } else
    this.server.listen(address);  // UNIX socket path.

  // 注册server发出listening事件的回调函数
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

// 添加worker,server发出listening事件后调用master.js中传入的回调函数
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  if (this.server === null)
    return done();

  // Still busy binding.
  this.server.once(&#39;listening&#39;, done);
  this.server.once(&#39;error&#39;, (err) => {
    send(err.errno, null);
  });
};

// 删除worker,轮询时不再分配给该worker
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);

  if (!existed)
    return false;

  this.free.delete(worker.id);

  if (this.all.size !== 0)
    return false;

  for (const handle of this.handles) {
    handle.close();
  }
  this.handles = [];

  this.handle.close();
  this.handle = null;
  return true;
};

// 轮询调度函数
RoundRobinHandle.prototype.distribute = function(err, handle) {
  ArrayPrototypePush(this.handles, handle);
  const [ workerEntry ] = this.free; // this.free is a SafeMap

  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};

// 将handle交给worker
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = ArrayPrototypeShift(this.handles);

  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }

  // 向该worker发出newconn事件
  const message = { act: &#39;newconn&#39;, key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};
Nach dem Login kopieren

子进程

在每个子进程中,我们都创建了一个 HTTP Server,然后执行 listen 函数监听 8000 端口,而 HTTP Server 实例是由 Net Server 原型链继承得到的,listen 函数即为 Net Server 原型上的 listen 函数,具体如下:

// lib/_http_server.js

function Server(options, requestListener) {
  ....
}

ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
Nach dem Login kopieren
// lib/net.js

Server.prototype.listen = function(...args) {

  // 由于篇幅原因,省略一些参数nomolize和其他监听的处理
  
  // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口
  if (typeof options.port === &#39;number&#39; || typeof options.port === &#39;string&#39;) {
    validatePort(options.port, &#39;options.port&#39;);
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  
  // 省略...
};

// 集群监听函数
function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require(&#39;cluster&#39;);

  // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回
  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数
  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port);
      return server.emit(&#39;error&#39;, ex);
    }

    // Reuse master&#39;s server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

// 启用监听handle
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd);

  // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;);
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;);

    let rval = null;
    
    // 篇幅原因,创建监听句柄的代码...
    
    this._handle = rval;
  }
  
  // 在this上设置的faux handle上设置onconnection函数用于监听连接进入
  this._handle.onconnection = onconnection;
}
Nach dem Login kopieren

同时,在开始解析的时候我们说过,在引入 Cluster 模块的时候,会根据当前进程的env中是否包含NODE_UNIQUE_ID去判断是否为子进程,若为子进程,则执行 child.js 文件

Tips:IPC 通信中发送的message.cmd的值如果以NODE为前缀,它将响应一个内部事件internalMessage

// lib/internal/cluster/child.js

// 初始化
const cluster = new EventEmitter();
// 存储生成的 faux handle
const handles = new SafeMap();
// 存储监听地址与监听地址index的对应关系
const indexes = new SafeMap();
cluster.isWorker = true;
cluster.isMaster = false;
cluster.worker = null;
cluster.Worker = Worker;

// 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量
// 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数
cluster._setupWorker = function() {
  // 初始化worker实例
  const worker = new Worker({
    id: +process.env.NODE_UNIQUE_ID | 0,
    process: process,
    state: &#39;online&#39;
  });

  cluster.worker = worker;

  // 处理断开连接事件
  process.once(&#39;disconnect&#39;, () => {
    worker.emit(&#39;disconnect&#39;);

    if (!worker.exitedAfterDisconnect) {
      // Unexpected disconnect, master exited, or some such nastiness, so
      // worker exits immediately.
      process.exit(0);
    }
  });

  // IPC 内部通信事件监听
  process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  send({ act: &#39;online&#39; });

  function onmessage(message, handle) {
    // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server
    if (message.act === &#39;newconn&#39;)
      onconnection(message, handle);
    else if (message.act === &#39;disconnect&#39;)
      ReflectApply(_disconnect, worker, [true]);
  }
};

// 添加获取server函数,会在net server监听端口时被执行
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === &#39;string&#39; &&
      process.platform !== &#39;win32&#39;)
    address = path.resolve(address);

  // 生成地址信息的的key
  const indexesKey = ArrayPrototypeJoin(
    [
      address,
      options.port,
      options.addressType,
      options.fd,
    ], &#39;:&#39;);

  // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server
  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  // 设置 indexesKey 与 index的对应关系
  indexes.set(indexesKey, index);

  // 传递地址信息及index
  const message = {
    act: &#39;queryServer&#39;,
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  // 向主进程发送queryServer消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;)
      obj._setServerData(reply.data);

    // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数
    // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内
    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once(&#39;listening&#39;, () => {
    cluster.worker.state = &#39;listening&#39;;
    const address = obj.address();
    message.act = &#39;listening&#39;;
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

// 创建 faux handle,并保存其对应关系
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: &#39;close&#39;, key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // 创建Faux handle
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  // 保存faux handle
  handles.set(key, handle);
  // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数
  cb(0, handle);
}

// 处理请求
// Round-robin connection.
function onconnection(message, handle) {
  // 获取faux handle的key
  const key = message.key;
  // 获取faux hadle
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求
  if (accepted)
    server.onconnection(0, handle);
}
Nach dem Login kopieren

至此,所有的内容都联系起来了。

为什么多个子进程可以监听同一个端口

在之前的代码分析中我们可以知道,Cluster 集群会在 Master 进程中创建 Net Server,在 Worker 进程运行创建 HTTP Server 的时候,会将监听地址的信息传入 cluster._getServer 函数创建一个 faux handle 并设置到子进程的 Net Server 上,在 Worker 进程初始化的时候会注册 IPC 通信回调函数,在回调函数内 ,调用在子进程中 Net Server 模块初始化后的 {faux handle}.onconnection 函数,并将传过来的连接的 handle 传入完成请求响应。

如何保证集群工作的健壮性

我们可以在 Master 进程中监听 Worker 进程的 errordisconntectexit 事件,在这些事件中去做对应的处理,例如清理退出的进程并重新 fork,或者使用已经封装好的 npm 包,例如 cfork

Egg.js 多进程模型

在 Egg.js 的多进程模型中,多了另外一个进程类型,即 Agent 进程,该进程主要用于处理多进程不好处理的一些事情还有减少长链接的数量,具体关系如下:

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
Nach dem Login kopieren

egg-cluster 包内,使用了 cfork 包去保证 Worker 进程挂掉后自动重启

问题记录

在我们的一个 Egg 应用内,日志系统并没有使用 Egg 原生的日志,使用了一个内部基于 log4jsrrreee

Unterschiedliche Dateien bedeuten die Leistung der beiden Prozesse in der Ausführung sind auch unterschiedlich, zum Beispiel: 🎜rrreee🎜Deshalb können die Variablen überall im Cluster-Modul verschiedene Arten von Prozessen unterscheiden. Lassen Sie uns als Nächstes den spezifischen Prozess vom Haupt- bzw. Unterprozess verstehen.🎜

🎜Hauptprozess🎜

🎜Im obigen Code macht der Master-Prozess nicht viel. Er teilt den untergeordneten Prozess nur entsprechend der Anzahl der CPUs auf Gehen Sie in den Quellcode und werfen Sie einen groben Blick darauf. Relevante Beschreibungen finden Sie in den Kommentaren des Codes und dann ausführen Die Funktion listen überwacht Port 8000 und die HTTP-Server-Instanz wird von der Net Server-Prototypkette geerbt. Die Funktion listen ist die Funktion listenCode> Funktion auf dem Net Server-Prototyp. Die Details lauten wie folgt: 🎜rrreeerrreee🎜Zur gleichen Zeit, als wir mit dem Parsen begannen, sagten wir, dass bei der Einführung des Cluster-Moduls beurteilt wird, ob es sich um einen untergeordneten Prozess handelt ob die Umgebung des aktuellen Prozesses NODE_UNIQUE_ID enthält. Wenn es sich um einen untergeordneten Prozess handelt, <code>child.js-Datei🎜🎜🎜Tipps: Wenn dem in der IPC-Kommunikation gesendeten Wert von message.cmd das Präfix NODE vorangestellt ist, ist dies der Fall wird auf ein internes Ereignis reagieren internalMessage🎜🎜rrreee🎜An diesem Punkt sind alle Inhalte verbunden. 🎜

🎜Warum mehrere untergeordnete Prozesse denselben Port abhören können🎜🎜🎜In der vorherigen Codeanalyse können wir wissen, dass der Cluster-Cluster im Master-Prozess einen Net Server erstellt ein Net-Server im Worker-Prozess Bei der Ausführung zum Erstellen eines HTTP-Servers werden die Informationen zur Abhöradresse an die Funktion cluster._getServer übergeben, um ein Faux-Handle zu erstellen und festzulegen an den Net-Server des untergeordneten Prozesses. Während der Initialisierung wird die IPC-Kommunikations-Callback-Funktion nach der Net-Initialisierung in der Callback-Funktion registriert Das Servermodul im untergeordneten Prozess wird aufgerufen und das übergebene Verbindungshandle wird übergeben. Geben Sie die Antwort auf die Abschlussanforderung ein. 🎜

🎜So stellen Sie die Robustheit der Clusterarbeit sicher🎜🎜🎜Wir können den error und disconntect des Worker-Prozesses überwachen Führen Sie im Master-Prozess > exit-Ereignisse eine entsprechende Verarbeitung in diesen Ereignissen durch, z. B. das Bereinigen des beendeten Prozesses und einen erneuten fork oder die Verwendung gekapselter npm-Pakete, z. B. cfork

🎜

🎜Egg.js Multiprozessmodell🎜🎜🎜Im Multiprozessmodell von Egg.js gibt es einen weiteren Prozesstyp, den Agenten Dieser Prozess wird hauptsächlich verwendet, um einige Dinge zu verarbeiten, die mit mehreren Prozessen schwer zu handhaben sind, und um die Anzahl langer Links zu reduzieren. Die spezifische Beziehung ist wie folgt: 🎜rrreee🎜Im egg-cluster Paket, cfork Paket, um sicherzustellen, dass der Worker-Prozess automatisch neu gestartet wird, nachdem er hängen bleibt🎜<h3 data-id="heading-8">🎜Problemaufzeichnung🎜🎜🎜🎜In einer unserer Egg-Anwendungen Das Protokollsystem verwendet nicht die nativen Protokolle von Egg, sondern verwendet eine interne Protokollbibliothek, die auf dem Paket <code>log4js basiert. Bei der Verwendung wird der erforderliche Logger auf das Anwendungsobjekt erweitert, sodass jeder Worker-Prozess erstellt wird ein neuer Logger, das heißt, es treten Probleme auf, wenn mehrere Prozesse Protokolle schreiben, aber es gibt kein Fehlerproblem, wenn mehrere Prozesse Protokolle schreiben

Bei der Verfolgung des Quellcodes haben wir festgestellt, dass log4js zwar den Cluster-Modus bereitstellt, der Cluster-Modus von log4js jedoch im Paket der oberen Ebene nicht aktiviert ist der Appender jedes Loggers. Sie alle verwenden flag a, um einen Schreibstream zu öffnen, aber ich habe bisher noch keine Antwort erhaltenlog4js 虽然提供了 Cluster 模式,但是在上层封装中并没有开启 log4js 的 Cluster 模式,所以每个 Logger 的 appender 都使用 flag a 打开一个写入流,到这里并没有得到答案

后来在 CNode 中找到了答案,在 unix 下使用 flag a 打开的可写流对应的 libuv 文件池实现是 UV_FS_O_APPEND,即 O_APPEND,而 O_APPEND

Später CNode

gefunden. Die libuv-Dateipoolimplementierung, die dem beschreibbaren Stream entspricht, der mit flag a unter Unix geöffnet wurde, ist ​​UV_FS_O_APPEND, d. h. O_APPEND und O_APPEND selbst sind im Man-Handbuch als atomare Operation definiert. Der Kernel stellt das gleichzeitige Schreiben von sicher Dieser beschreibbare Stream ist sicher und erfordert keine zusätzlichen Sperren auf der Anwendungsebene (außer dass das gleichzeitige Schreiben auf NFS-ähnlichen Dateisystemen zum Verlust oder zur Beschädigung von Dateiinformationen führt). NFS-ähnliche netzwerkgebundene Dateisysteme basieren hauptsächlich auf der Simulation Die zugrunde liegende API ermöglicht die Erzielung lokaler Vorgänge, die unter Wettbewerbsbedingungen offensichtlich nicht ausgeführt werden können. Wenn Ihre Protokolle also lokal auf eine OSS-Cloud-Festplatte geschrieben werden, ist dies nicht möglich Wenn Sie in mehreren Prozessen schreiben, müssen Sie es manuell auf der Anwendungsebene sperren Weitere Kenntnisse zum Thema Programmierung finden Sie unter:

Einführung in die Programmierung

! !

Das obige ist der detaillierte Inhalt vonErfahren Sie in einem Artikel mehr über das Multiprozessmodell in Node.js. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:juejin.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!