In diesem Artikel wird hauptsächlich die Lernerfahrung und Codefreigabe der JavaScript-Multithread-Laufzeitbibliothek Nexus.js vorgestellt. Freunde in Not können darauf zurückgreifen und gemeinsam lernen.
Ereignisschleife
Keine Ereignisschleife
Verfügt über einen Thread-Pool mit einem (sperrlosen) Aufgabenobjekt
Jedes Mal, wenn setTimeout oder setImmediate aufgerufen oder ein Promise erstellt wird, wird die Aufgabe in die Aufgabenwarteschlange eingereiht.
Immer wenn eine Aufgabe geplant ist, wählt der erste verfügbare Thread die Aufgabe aus und führt sie aus.
Behandeln Sie Promises auf dem CPU-Kern. Aufrufe von Promise.all() lösen Promises parallel auf.
ES6
unterstützt Async/Await, und es wird empfohlen,
Unterstützungen für Wait(...) zu verwenden.
Unterstützt Destrukturierung
Unterstützt asynchrones Try/Catch/finally
Modul
Unterstützt CommonJS nicht . (require(...) und module.exports)
Alle Module verwenden die Import-/Exportsyntax von ES6
Unterstützt den dynamischen Import über import('file-or-packge').then( . ..)
Unterstützt import.meta, wie zum Beispiel: import.meta.filename und import.meta.dirname usw.
Zusätzliche Funktionen: Unterstützt den direkten Import von URL, wie zum Beispiel:
import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';
EventEmitter
Nexus implementiert die Promise-basierte EventEmitter-Klasse
Event-Handler sind in allen Threads angeordnet und werden parallel ausgeführt.
Der Rückgabewert von EventEmitter.emit(...) ist ein Promise, das im Event-Handler in ein Array von Rückgabewerten analysiert werden kann.
Zum Beispiel:
class EmitterTest extends Nexus.EventEmitter { constructor() { super(); for(let i = 0; i < 4; i++) this.on('test', value => { console.log(`fired test ${i}!`); console.inspect(value); }); for(let i = 0; i < 4; i++) this.on('returns-a-value', v => `${v + i}`); } } const test = new EmitterTest(); async function start() { await test.emit('test', { payload: 'test 1' }); console.log('first test done!'); await test.emit('test', { payload: 'test 2' }); console.log('second test done!'); const values = await test.emit('returns-a-value', 10); console.log('third test done, returned values are:'); console.inspect(values); } start().catch(console.error);
I/O
Alle Eingaben/Ausgaben erfolgen über drei Grundelemente: Gerät, Filter und Stream.
Alle Eingabe-/Ausgabeprimitive implementieren die EventEmitter-Klasse
Um Device zu verwenden, müssen Sie einen ReadableStream oder WritableStream auf Device erstellen
Um Daten zu manipulieren, können Sie Filter verwenden werden zu ReadableStream oder WritableStream hinzugefügt.
Verwenden Sie abschließend source.pipe(...destinationStreams) und warten Sie, bis source.resume() die Daten verarbeitet.
Alle Eingabe-/Ausgabevorgänge werden mithilfe von ArrayBuffer-Objekten durchgeführt.
Filter hat versucht, die Daten mit der Methode „process(buffer)“ zu verarbeiten.
Beispiel: Konvertieren Sie UTF-8 in UTF6 unter Verwendung von 2 separaten Ausgabedateien.
const startTime = Date.now(); try { const device = new Nexus.IO.FilePushDevice('enwik8'); const stream = new Nexus.IO.ReadableStream(device); stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE")); const wstreams = [0,1,2,3] .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i))); console.log('piping...'); stream.pipe(...wstreams); console.log('streaming...'); await stream.resume(); await stream.close(); await Promise.all(wstreams.map(stream => stream.close())); console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`); } catch (e) { console.error('An error occurred: ', e); } } start().catch(console.error);
TCP/UDP
Nexus.js stellt eine Acceptor-Klasse bereit, die für die Bindung von IP-Adressen/Ports und die Überwachung von Verbindungen verantwortlich ist
Jedes Mal, wenn eine Verbindungsanfrage empfangen wird, wird das Verbindungsereignis ausgelöst und ein Socket-Gerät bereitgestellt.
Jede Socket-Instanz ist ein Vollduplex-E/A-Gerät.
Sie können ReadableStream und WritableStream verwenden, um Socket zu betreiben.
Das einfachste Beispiel: (Senden Sie „Hello World“ an den Kunden)
const acceptor = new Nexus.Net.TCP.Acceptor(); let count = 0; acceptor.on('connection', (socket, endpoint) => { const connId = count++; console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`); const rstream = new Nexus.IO.ReadableStream(socket); const wstream = new Nexus.IO.WritableStream(socket); const buffer = new Uint8Array(13); const message = 'Hello World!\n'; for(let i = 0; i < 13; i++) buffer[i] = message.charCodeAt(i); rstream.pushFilter(new Nexus.IO.UTF8StringFilter()); rstream.on('data', buffer => console.log(`got message: ${buffer}`)); rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`)); console.log(`sending greeting to #${connId}!`); wstream.write(buffer); }); acceptor.bind('127.0.0.1', 10000); acceptor.listen(); console.log('server ready');
Http
Nexus bietet A Nexus.Net.HTTP.Server-Klasse, die im Wesentlichen einige grundlegende Schnittstellen von TCPAcceptor erbt
Wenn der Server die Analyse/Korrektur des grundlegenden HTTP-Headers der eingehenden Verbindung abschließt. Bei der Überprüfung wird die Das Verbindungsereignis wird mithilfe der Verbindung und derselben Informationen ausgelöst.
Jede Verbindungsinstanz verfügt über ein Anforderungs- und ein Antwortobjekt. Dies sind Ein-/Ausgabegeräte.
Sie können ReadableStream und WritableStream erstellen, um Anfrage/Antwort zu manipulieren.
Wenn Sie über eine Pipe eine Verbindung zu einem Antwortobjekt herstellen, verwendet der Eingabestream den Chunked-Codierungsmodus. Andernfalls können Sie mit „response.write()“ einen regulären String schreiben.
Komplexes Beispiel: (einfacher HTTP-Server mit Chunk-Kodierung, Details weggelassen)
.... /** * Creates an input stream from a path. * @param path * @returns {Promise<ReadableStream>} */ async function createInputStream(path) { if (path.startsWith('/')) // If it starts with '/', omit it. path = path.substr(1); if (path.startsWith('.')) // If it starts with '.', reject it. throw new NotFoundError(path); if (path === '/' || !path) // If it's empty, set to index.html. path = 'index.html'; /** * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`. */ const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path); try { // Stat the target path. const {type} = await Nexus.FileSystem.stat(filePath); if (type === Nexus.FileSystem.FileType.Directory) // If it's a directory, return its 'index.html' return createInputStream(Nexus.FileSystem.join(filePath, 'index.html')); else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound) // If it's not found, throw NotFound. throw new NotFoundError(path); } catch(e) { if (e.code) throw e; throw new NotFoundError(path); } try { // First, we create a device. const fileDevice = new Nexus.IO.FilePushDevice(filePath); // Then we return a new ReadableStream created using our source device. return new Nexus.IO.ReadableStream(fileDevice); } catch(e) { throw new InternalServerError(e.message); } } /** * Connections counter. */ let connections = 0; /** * Create a new HTTP server. * @type {Nexus.Net.HTTP.Server} */ const server = new Nexus.Net.HTTP.Server(); // A server error means an error occurred while the server was listening to connections. // We can mostly ignore such errors, we display them anyway. server.on('error', e => { console.error(FgRed + Bright + 'Server Error: ' + e.message + '\n' + e.stack, Reset); }); /** * Listen to connections. */ server.on('connection', async (connection, peer) => { // Start with a connection ID of 0, increment with every new connection. const connId = connections++; // Record the start time for this connection. const startTime = Date.now(); // Destructuring is supported, why not use it? const { request, response } = connection; // Parse the URL parts. const { path } = parseURL(request.url); // Here we'll store any errors that occur during the connection. const errors = []; // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream. let inStream, outStream; try { // Log the request. console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset); // Set the 'Server' header. response.set('Server', `nexus.js/0.1.1`); // Create our input stream. inStream = await createInputStream(path); // Create our output stream. outStream = new Nexus.IO.WritableStream(response); // Hook all `error` events, add any errors to our `errors` array. inStream.on('error', e => { errors.push(e); }); request.on('error', e => { errors.push(e); }); response.on('error', e => { errors.push(e); }); outStream.on('error', e => { errors.push(e); }); // Set content type and request status. response .set('Content-Type', mimeType(path)) .status(200); // Hook input to output(s). const disconnect = inStream.pipe(outStream); try { // Resume our file stream, this causes the stream to switch to HTTP chunked encoding. // This will return a promise that will only resolve after the last byte (HTTP chunk) is written. await inStream.resume(); } catch (e) { // Capture any errors that happen during the streaming. errors.push(e); } // Disconnect all the callbacks created by `.pipe()`. return disconnect(); } catch(e) { // If an error occurred, push it to the array. errors.push(e); // Set the content type, status, and write a basic message. response .set('Content-Type', 'text/plain') .status(e.code || 500) .send(e.message || 'An error has occurred.'); } finally { // Close the streams manually. This is important because we may run out of file handles otherwise. if (inStream) await inStream.close(); if (outStream) await outStream.close(); // Close the connection, has no real effect with keep-alive connections. await connection.close(); // Grab the response's status. let status = response.status(); // Determine what colour to output to the terminal. const statusColors = { '200': Bright + FgGreen, // Green for 200 (OK), '404': Bright + FgYellow, // Yellow for 404 (Not Found) '500': Bright + FgRed // Red for 500 (Internal Server Error) }; let statusColor = statusColors[status]; if (statusColor) status = statusColor + status + Reset; // Log the connection (and time to complete) to the console. console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` + (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset)); } }); /** * IP and port to listen on. */ const ip = '0.0.0.0', port = 3000; /** * Whether or not to set the `reuse` flag. (optional, default=false) */ const portReuse = true; /** * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific) * @type {number} */ const maxConcurrentConnections = 1000; /** * Bind the selected address and port. */ server.bind(ip, port, portReuse); /** * Start listening to requests. */ server.listen(maxConcurrentConnections); /** * Happy streaming! */ console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);
Benchmarks
Ich glaube, ich habe alles abgedeckt, was ich bisher implementiert habe. Lassen Sie uns nun über die Leistung sprechen.
Hier sind die aktuellen Benchmarks für den Appeal-HTTP-Server mit 100 gleichzeitigen Verbindungen und insgesamt 10.000 Anfragen:
This is ApacheBench, Version 2.3 <$Revision: 1796539 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking localhost (be patient).....done Server Software: nexus.js/0.1.1 Server Hostname: localhost Server Port: 3000 Document Path: / Document Length: 8673 bytes Concurrency Level: 100 Time taken for tests: 9.991 seconds Complete requests: 10000 Failed requests: 0 Total transferred: 87880000 bytes HTML transferred: 86730000 bytes Requests per second: 1000.94 [#/sec] (mean) Time per request: 99.906 [ms] (mean) Time per request: 0.999 [ms] (mean, across all concurrent requests) Transfer rate: 8590.14 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.1 0 1 Processing: 6 99 36.6 84 464 Waiting: 5 99 36.4 84 463 Total: 6 100 36.6 84 464 Percentage of the requests served within a certain time (ms) 50% 84 66% 97 75% 105 80% 112 90% 134 95% 188 98% 233 99% 238 100% 464 (longest request)
1000 Anfragen pro Sekunde. Auf einem alten i7 läuft die Benchmark-Software, eine IDE, die 5 GB Speicher beansprucht, und der Server selbst.
voodooattack@voodooattack:~$ cat /proc/cpuinfo processor : 0 vendor_id : GenuineIntel cpu family : 6 model : 60 model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz stepping : 3 microcode : 0x22 cpu MHz : 3392.093 cache size : 8192 KB physical id : 0 siblings : 8 core id : 0 cpu cores : 4 apicid : 0 initial apicid : 0 fpu : yes fpu_exception : yes cpuid level : 13 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts bugs : bogomips : 6784.18 clflush size : 64 cache_alignment : 64 address sizes : 39 bits physical, 48 bits virtual power management:
Ich habe 1000 gleichzeitige Anfragen ausprobiert, aber bei APacheBench kommt es zu einer Zeitüberschreitung, da viele Sockets geöffnet sind. Ich habe httperf ausprobiert und hier sind die Ergebnisse:
voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000 httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1 httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE Maximum connect burst length: 262 Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections) Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1 Connection time [ms]: connect 207.3 Connection length [replies/conn]: 1.000 Request rate: 975.1 req/s (1.0 ms/req) Request size [B]: 62.0 Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples) Reply time [ms]: response 129.5 transfer 1.1 Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0) Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0 CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%) Net I/O: 8389.9 KB/s (68.7*10^6 bps) Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0 Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0
Wie Sie sehen können, funktioniert es immer noch. Allerdings kommt es bei einigen Verbindungen aufgrund von Stress zu Zeitausfällen. Wir untersuchen immer noch die Ursache dieses Problems.
Das Obige habe ich für Sie zusammengestellt und hoffe, dass es Ihnen in Zukunft hilfreich sein wird.
Verwandte Artikel:
Zusammenfassung der JS-Operation DOM-Baum-Traversal-Methode
JS-Implementierung des Array-Deduplizierungsalgorithmus
JS, um das kleinste gemeinsame Vielfache und den größten gemeinsamen Teiler zu erhalten
Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung der Multithread-Laufzeitbibliothek Nexus.js in JavaScript (Teil des angehängten Codes). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!