Maison > interface Web > js tutoriel > Explication détaillée de la bibliothèque d'exécution multithread Nexus.js en JavaScript (partie du code ci-joint)

Explication détaillée de la bibliothèque d'exécution multithread Nexus.js en JavaScript (partie du code ci-joint)

亚连
Libérer: 2018-05-18 17:49:14
original
1646 Les gens l'ont consulté

Cet article présente principalement l'expérience d'apprentissage et le partage de code de la bibliothèque d'exécution multithread JavaScript Nexus.js. Les amis dans le besoin peuvent s'y référer et apprendre ensemble.

Boucle d'événement

Aucune boucle d'événement

Possède un pool de threads avec un objet tâche (sans verrouillage)

Chaque fois que setTimeout ou setImmediate est appelé ou qu'une promesse est créée, la tâche est mise en file d'attente dans la file d'attente des tâches.

Chaque fois qu'une tâche est planifiée, le premier thread disponible sélectionnera la tâche et l'exécutera.

Gérez les promesses sur le cœur du processeur. Les appels à Promise.all() résoudront les promesses en parallèle.

ES6

prend en charge async/await, et il est recommandé d'utiliser les supports

pour wait(...)

Supporte la déstructuration

Supporte le try/catch/finally asynchrone

Module

Ne prend pas en charge CommonJS . (require(...) et module.exports)

Tous les modules utilisent la syntaxe d'importation/exportation d'ES6

Prend en charge l'importation dynamique via import('file-or-packge').then( . ..)

Prend en charge import.meta, tel que : import.meta.filename et import.meta.dirname, etc.

Fonctionnalités supplémentaires : prend en charge l'importation directement à partir d'une URL, telle que :

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';
Copier après la connexion

EventEmitter

Nexus implémente la classe EventEmitter basée sur Promise

Les gestionnaires d'événements sont ordonnés sur tous les threads et seront exécutés en parallèle.

La valeur de retour de EventEmitter.emit(...) est une promesse, qui peut être analysée dans un tableau de valeurs de retour dans le gestionnaire d'événements.

Par exemple :

class EmitterTest extends Nexus.EventEmitter {
 constructor() {
  super();
  for(let i = 0; i < 4; i++)
   this.on(&#39;test&#39;, value => { console.log(`fired test ${i}!`); console.inspect(value); });
  for(let i = 0; i < 4; i++)
   this.on(&#39;returns-a-value&#39;, v => `${v + i}`);
 }
}
const test = new EmitterTest();
async function start() {
 await test.emit(&#39;test&#39;, { payload: &#39;test 1&#39; });
 console.log(&#39;first test done!&#39;);
 await test.emit(&#39;test&#39;, { payload: &#39;test 2&#39; });
 console.log(&#39;second test done!&#39;);
 const values = await test.emit(&#39;returns-a-value&#39;, 10);
 console.log(&#39;third test done, returned values are:&#39;); console.inspect(values);
}
start().catch(console.error);
Copier après la connexion

E/S

Toutes les entrées/sorties se font via trois primitives : Périphérique , Filtrer et diffuser.

Toutes les primitives d'entrée/sortie implémentent la classe EventEmitter

Pour utiliser Device, vous devez créer un ReadableStream ou WritableStream au-dessus de Device

Pour manipuler les données, vous pouvez filtrer sont ajoutés à ReadableStream ou WritableStream.

Enfin, utilisez source.pipe(...destinationStreams) et attendez que source.resume() traite les données.

Toutes les opérations d'entrée/sortie sont effectuées à l'aide d'objets ArrayBuffer.

Filter a essayé la méthode process(buffer) pour traiter les données.

Exemple : Convertissez UTF-8 en UTF6 en utilisant 2 fichiers de sortie distincts.

const startTime = Date.now();
 try {
  const device = new Nexus.IO.FilePushDevice(&#39;enwik8&#39;);
  const stream = new Nexus.IO.ReadableStream(device);
  stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
  const wstreams = [0,1,2,3]
   .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice(&#39;enwik16-&#39; + i)));
  console.log(&#39;piping...&#39;);
  stream.pipe(...wstreams);
  console.log(&#39;streaming...&#39;);
  await stream.resume();
  await stream.close();
  await Promise.all(wstreams.map(stream => stream.close()));
  console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
 } catch (e) {
  console.error(&#39;An error occurred: &#39;, e);
 }
}
start().catch(console.error);
Copier après la connexion

TCP/UDP

Nexus.js fournit une classe Acceptor, qui est responsable de la liaison des adresses IP/ports et de la surveillance des connexions

Chaque fois qu'une demande de connexion est reçue, l'événement de connexion sera déclenché et un périphérique Socket sera fourni.

Chaque instance de Socket est un périphérique d'E/S en duplex intégral.

Vous pouvez utiliser ReadableStream et WritableStream pour faire fonctionner Socket.

L'exemple le plus basique : (Envoyer "Hello World" au client)

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;
acceptor.on(&#39;connection&#39;, (socket, endpoint) => {
 const connId = count++;
 console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`);
 const rstream = new Nexus.IO.ReadableStream(socket);
 const wstream = new Nexus.IO.WritableStream(socket);
 const buffer = new Uint8Array(13);
 const message = &#39;Hello World!\n&#39;;
 for(let i = 0; i < 13; i++)
  buffer[i] = message.charCodeAt(i);
 rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
 rstream.on(&#39;data&#39;, buffer => console.log(`got message: ${buffer}`));
 rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`));
 console.log(`sending greeting to #${connId}!`);
 wstream.write(buffer);
});
acceptor.bind(&#39;127.0.0.1&#39;, 10000);
acceptor.listen();
console.log(&#39;server ready&#39;);
Copier après la connexion

Http

Nexus Fournit une classe Nexus.Net.HTTP.Server, qui hérite essentiellement de TCPAcceptor

Quelques interfaces de base

Lorsque le serveur termine l'analyse de l'en-tête Http de base de la connexion entrante/Lors de la vérification, le l'événement de connexion sera déclenché en utilisant la connexion et les mêmes informations

Chaque instance de connexion a une requête et un objet de réponse. Ce sont des périphériques d'entrée/sortie.

Vous pouvez construire ReadableStream et WritableStream pour manipuler la requête/réponse.

Si vous vous connectez à un objet Response via un tube, le flux d'entrée utilisera le mode de codage fragmenté. Sinon, vous pouvez utiliser Response.write() pour écrire une chaîne normale.

Exemple complexe : (serveur HTTP de base avec encodage par blocs, détails omis)

....
/**
 * Creates an input stream from a path.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
async function createInputStream(path) {
 if (path.startsWith(&#39;/&#39;)) // If it starts with &#39;/&#39;, omit it.
  path = path.substr(1);
 if (path.startsWith(&#39;.&#39;)) // If it starts with &#39;.&#39;, reject it.
  throw new NotFoundError(path);
 if (path === &#39;/&#39; || !path) // If it&#39;s empty, set to index.html.
  path = &#39;index.html&#39;;
 /**
  * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`.
  */
 const filePath = Nexus.FileSystem.join(import.meta.dirname, &#39;server_root&#39;, path);
 try {
  // Stat the target path.
  const {type} = await Nexus.FileSystem.stat(filePath);
  if (type === Nexus.FileSystem.FileType.Directory) // If it&#39;s a directory, return its &#39;index.html&#39;
   return createInputStream(Nexus.FileSystem.join(filePath, &#39;index.html&#39;));
  else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
   // If it&#39;s not found, throw NotFound.
   throw new NotFoundError(path);
 } catch(e) {
  if (e.code)
   throw e;
  throw new NotFoundError(path);
 }
 try {
  // First, we create a device.
  const fileDevice = new Nexus.IO.FilePushDevice(filePath);
  // Then we return a new ReadableStream created using our source device.
  return new Nexus.IO.ReadableStream(fileDevice);
 } catch(e) {
  throw new InternalServerError(e.message);
 }
}
/**
 * Connections counter.
 */
let connections = 0;
/**
 * Create a new HTTP server.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();
// A server error means an error occurred while the server was listening to connections.
// We can mostly ignore such errors, we display them anyway.
server.on(&#39;error&#39;, e => {
 console.error(FgRed + Bright + &#39;Server Error: &#39; + e.message + &#39;\n&#39; + e.stack, Reset);
});
/**
 * Listen to connections.
 */
server.on(&#39;connection&#39;, async (connection, peer) => {
 // Start with a connection ID of 0, increment with every new connection.
 const connId = connections++;
 // Record the start time for this connection.
 const startTime = Date.now();
 // Destructuring is supported, why not use it?
 const { request, response } = connection;
 // Parse the URL parts.
 const { path } = parseURL(request.url);
 // Here we&#39;ll store any errors that occur during the connection.
 const errors = [];
 // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
 let inStream, outStream;
 try {
  // Log the request.
  console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
  // Set the &#39;Server&#39; header.
  response.set(&#39;Server&#39;, `nexus.js/0.1.1`);
  // Create our input stream.
  inStream = await createInputStream(path);
  // Create our output stream.
  outStream = new Nexus.IO.WritableStream(response);
  // Hook all `error` events, add any errors to our `errors` array.
  inStream.on(&#39;error&#39;, e => { errors.push(e); });
  request.on(&#39;error&#39;, e => { errors.push(e); });
  response.on(&#39;error&#39;, e => { errors.push(e); });
  outStream.on(&#39;error&#39;, e => { errors.push(e); });
  // Set content type and request status.
  response
   .set(&#39;Content-Type&#39;, mimeType(path))
   .status(200);
  // Hook input to output(s).
  const disconnect = inStream.pipe(outStream);
  try {
   // Resume our file stream, this causes the stream to switch to HTTP chunked encoding.
   // This will return a promise that will only resolve after the last byte (HTTP chunk) is written.
   await inStream.resume();
  } catch (e) {
   // Capture any errors that happen during the streaming.
   errors.push(e);
  }
  // Disconnect all the callbacks created by `.pipe()`.
  return disconnect();
 } catch(e) {
  // If an error occurred, push it to the array.
  errors.push(e);
  // Set the content type, status, and write a basic message.
  response
   .set(&#39;Content-Type&#39;, &#39;text/plain&#39;)
   .status(e.code || 500)
   .send(e.message || &#39;An error has occurred.&#39;);
 } finally {
  // Close the streams manually. This is important because we may run out of file handles otherwise.
  if (inStream)
   await inStream.close();
  if (outStream)
   await outStream.close();
  // Close the connection, has no real effect with keep-alive connections.
  await connection.close();
  // Grab the response&#39;s status.
  let status = response.status();
  // Determine what colour to output to the terminal.
  const statusColors = {
   &#39;200&#39;: Bright + FgGreen, // Green for 200 (OK),
   &#39;404&#39;: Bright + FgYellow, // Yellow for 404 (Not Found)
   &#39;500&#39;: Bright + FgRed // Red for 500 (Internal Server Error)
  };
  let statusColor = statusColors[status];
  if (statusColor)
   status = statusColor + status + Reset;
  // Log the connection (and time to complete) to the console.
  console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
   (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(&#39;, &#39;) + Reset : Reset));
 }
});
/**
 * IP and port to listen on.
 */
const ip = &#39;0.0.0.0&#39;, port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Bind the selected address and port.
 */
server.bind(ip, port, portReuse);
/**
 * Start listening to requests.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);
Copier après la connexion

Benchmarks

Je pense avoir couvert tout ce qui a été mis en œuvre jusqu'à présent. Parlons maintenant des performances.

Voici les benchmarks actuels du serveur HTTP Appeal, avec 100 connexions simultanées et un total de 10 000 requêtes :

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient).....done
Server Software:    nexus.js/0.1.1
Server Hostname:    localhost
Server Port:      3000
Document Path:     /
Document Length:    8673 bytes
Concurrency Level:   100
Time taken for tests:  9.991 seconds
Complete requests:   10000
Failed requests:    0
Total transferred:   87880000 bytes
HTML transferred:    86730000 bytes
Requests per second:  1000.94 [#/sec] (mean)
Time per request:    99.906 [ms] (mean)
Time per request:    0.999 [ms] (mean, across all concurrent requests)
Transfer rate:     8590.14 [Kbytes/sec] received
Connection Times (ms)
       min mean[+/-sd] median  max
Connect:    0  0  0.1   0    1
Processing:   6  99 36.6   84   464
Waiting:    5  99 36.4   84   463
Total:     6 100 36.6   84   464
Percentage of the requests served within a certain time (ms)
 50%   84
 66%   97
 75%  105
 80%  112
 90%  134
 95%  188
 98%  233
 99%  238
 100%  464 (longest request)
Copier après la connexion

1000 requêtes par seconde. Sur un ancien i7, il exécute le logiciel de référence, un IDE qui occupe 5 Go de mémoire, et le serveur lui-même.

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor  : 0
vendor_id  : GenuineIntel
cpu family : 6
model    : 60
model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping  : 3
microcode  : 0x22
cpu MHz   : 3392.093
cache size : 8192 KB
physical id : 0
siblings  : 8
core id   : 0
cpu cores  : 4
apicid   : 0
initial apicid : 0
fpu   : yes
fpu_exception  : yes
cpuid level : 13
wp   : yes
flags    : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs    :
bogomips  : 6784.18
clflush size  : 64
cache_alignment : 64
address sizes  : 39 bits physical, 48 bits virtual
power management:
Copier après la connexion

J'ai essayé 1 000 requêtes simultanées mais APacheBench expire en raison de l'ouverture de nombreux sockets. J'ai essayé httperf et voici les résultats :

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE
Maximum connect burst length: 262
Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s
Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)
Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Connection time [ms]: connect 207.3
Connection length [replies/conn]: 1.000
Request rate: 975.1 req/s (1.0 ms/req)
Request size [B]: 62.0
Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)
Reply time [ms]: response 129.5 transfer 1.1
Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)
Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0
CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)
Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0
Copier après la connexion

Comme vous pouvez le constater, cela fonctionne toujours. Bien que certaines connexions expirent en raison du stress. Nous recherchons toujours la cause de ce problème.

Ce qui précède est ce que j'ai compilé pour vous. J'espère que cela vous sera utile à l'avenir.

Articles connexes :

Résumé de la méthode de traversée de l'arbre DOM de l'opération JS

Implémentation JS de l'algorithme de déduplication de tableau

JS pour obtenir le plus petit commun multiple et le plus grand commun diviseur

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal