Datenströme mit der Web Streams API in Node.js navigieren
Ethan Miller
Product Engineer · Leapcell

Einleitung
In der heutigen datengesteuerten Welt ist die effiziente Handhabung großer Informationsmengen für den Aufbau leistungsfähiger und skalierbarer Anwendungen von größter Bedeutung. Herkömmliche Ansätze, die oft das Laden ganzer Datensätze in den Speicher vor der Verarbeitung beinhalten, werden schnell zu Engpässen, wenn mit Multigigabyte-Dateien, Echtzeit-Datenfeeds oder sequenziellen Operationen gearbeitet wird. Hier kommt die Leistung des Datenstreamings ins Spiel. Durch die Verarbeitung von Daten in kleineren Blöcken statt auf einmal können Anwendungen einen geringen Speicherbedarf aufrechterhalten, die Latenz reduzieren und die Reaktionsfähigkeit verbessern. Während Node.js seit langem sein eigenes stream
-Modul anbietet, hat das Aufkommen der Web Streams API, die ursprünglich für Browser entwickelt wurde, eine standardisierte und leistungsstarke Alternative für die konsistente Verarbeitung von Streaming-Daten über verschiedene JavaScript-Umgebungen hinweg gebracht. Dieser Artikel befasst sich damit, wie wir die Web Streams API effektiv in Node.js nutzen können, um eine hocheffiziente Datenverarbeitung zu erreichen.
Verständnis von Web Streams
Bevor wir uns mit den Implementierungsdetails befassen, lassen Sie uns einige Kernkonzepte der Web Streams API klären. Im Wesentlichen definiert die Web Streams API Schnittstellen, die die Erstellung, Zusammensetzung und den Verbrauch von Datenströmen ermöglichen. Die drei grundlegenden Streamtypen sind:
ReadableStream
: Repräsentiert eine Datenquelle, aus der Daten sequenziell gelesen werden können. Stellen Sie es sich wie einen Wasserhahnen vor, den Sie öffnen können, um einen kontinuierlichen Wasserfluss zu erhalten.WritableStream
: Repräsentiert ein Datenziel, in das Daten sequenziell geschrieben werden können. Dies ist wie ein Abfluss, in den Sie Wasser gießen können.TransformStream
: Agiert sowohl alsWritableStream
als auch alsReadableStream
. Es verarbeitet Daten, während sie durchfließen, und wandelt sie von einem Format in ein anderes um. Stellen Sie sich einen Filter vor, der Wasser reinigt, während es durchfließt.
Diese Streamtypen arbeiten mit "Chunks" von Daten, den Grundeinheiten der zu verarbeitenden Informationen. Ein wesentlicher Vorteil der Web Streams API ist ihre flüssige, Promise-basierte Benutzeroberfläche, die sich nahtlos in moderne asynchrone JavaScript-Muster, insbesondere async/await
, integriert.
Warum Web Streams in Node.js?
Obwohl Node.js über sein natives stream
-Modul verfügt, bietet die Web Streams API mehrere überzeugende Vorteile:
- Konsistenz: Bietet eine einheitliche API für die Datenübertragung in Browser- und Node.js-Umgebungen und vereinfacht die Entwicklung für Full-Stack-JavaScript-Entwickler.
- Komponierbarkeit: Entwickelt für einfaches Verketten und Umleiten von Streams, wodurch komplexe Datenverarbeitungspipelines mit minimalem Aufwand ermöglicht werden.
- Backpressure Handling: Integrierte Mechanismen zur Verwaltung von Backpressure, die sicherstellen, dass Produzenten keine Konsumenten überlasten, was für eine stabile und effiziente Leistung entscheidend ist.
- Async Iterators:
ReadableStream
s sind nativasync
iterierbar, was den eleganten Verbrauch mitfor await...of
-Schleifen ermöglicht.
Implementierung von Web Streams in Node.js
Lassen Sie uns mit praktischen Beispielen untersuchen, wie jeder Stream-Typ im Node.js-Kontext verwendet wird.
1. Erstellung eines ReadableStream
Ein ReadableStream
kann mit einem ReadableStreamDefaultController
konstruiert werden, der das Hinzufügen von Daten-Chunks ermöglicht.
import { ReadableStream } from 'node:stream/web'; // Import von 'node:stream/web' async function createNumberStream(limit) { let counter = 0; return new ReadableStream({ async start(controller) { console.log('ReadableStream gestartet'); while (counter < limit) { // Asynchrone Operation simulieren await new Promise(resolve => setTimeout(resolve, 50)); controller.enqueue(counter++); console.log(`Eingereiht: ${counter - 1}`); } controller.close(); console.log('ReadableStream geschlossen'); }, pull(controller) { // Diese Methode wird aufgerufen, wenn der Stream weitere Daten benötigt. // In diesem Beispiel fügen wir Daten proaktiv in `start` ein. //console.log('Pull angefordert'); }, cancel() { console.log('ReadableStream abgebrochen'); } }); } // Verbrauch des ReadableStream async function consumeStream() { const numberStream = await createNumberStream(5); console.log('--- Nummer-Stream verbrauchen ---'); for await (const chunk of numberStream) { console.log(`Empfangen: ${chunk}`); } console.log('--- Verbrauch abgeschlossen ---'); } consumeStream();
In diesem Beispiel generiert createNumberStream
eine Zahlenfolge. Die Methode start
ist der Ort, an dem sich unsere Datenproduktionslogik befindet, und wir verwenden controller.enqueue()
, um Daten einzufügen. Die Schleife for await...of
bietet eine saubere und idiomatische Möglichkeit, den Stream zu verbrauchen.
2. Erstellung eines WritableStream
Ein WritableStream
ermöglicht das Schreiben von Daten in ein Ziel.
import { WritableStream } from 'node:stream/web'; async function createLoggingWritableStream() { return new WritableStream({ async start(controller) { console.log('WritableStream gestartet'); }, async write(chunk, controller) { // Asynchrone Schreiboperation simulieren await new Promise(resolve => setTimeout(resolve, 20)); console.log(`Schreibe: ${chunk}`); }, async close() { console.log('WritableStream geschlossen'); }, async abort(reason) { console.error('WritableStream abgebrochen', reason); } }); } // Verwendung des WritableStream async function writeToStream() { const loggingStream = await createLoggingWritableStream(); const writer = loggingStream.getWriter(); console.log('--- Daten schreiben ---'); for (let i = 0; i < 5; i++) { await writer.write(`Daten-Chunk ${i}`); } await writer.close(); console.log('--- Schreiben abgeschlossen ---'); } writeToStream();
Hier protokolliert createLoggingWritableStream
einfach jeden empfangenen Chunk. Das von getWriter()
erhaltene Objekt writer
wird verwendet, um Daten in den WritableStream
zu schreiben.
3. Erstellung eines TransformStream
TransformStream
ist das Arbeitspferd für die Datenmodifikation in einer Pipeline.
import { ReadableStream, WritableStream, TransformStream } from 'node:stream/web'; // Ein TransformStream, um Zahlen in ihren quadrierten Wert umzuwandeln function createDoublerTransformStream() { return new TransformStream({ start(controller) { console.log('TransformStream gestartet'); }, transform(chunk, controller) { console.log(`Transformiere: ${chunk} -> ${chunk * 2}`); controller.enqueue(chunk * 2); }, flush(controller) { console.log('TransformStream geleert (alle Eingaben verarbeitet)'); } }); } // Streams verketten: Rohe Zahlen -> Verdoppelte Zahlen -> Protokolliert async function chainStreams() { const numberStream = new ReadableStream({ start(controller) { for (let i = 1; i <= 3; i++) { controller.enqueue(i); } controller.close(); } }); const doubledStream = createDoublerTransformStream(); const loggingStream = new WritableStream({ write(chunk) { console.log(`Endgültig empfangen: ${chunk}`); }, }); console.log('--- Streams verketten ---'); await numberStream .pipeThrough(doubledStream) // Durch den Transform-Stream leiten .pipeTo(loggingStream); // Zum Schreib-Stream leiten console.log('--- Verkettung abgeschlossen ---'); } chainStreams();
Dieses Beispiel demonstriert pipeThrough()
und pipeTo()
, wichtige Methoden zum Komponieren von Stream-Pipelines. pipeThrough()
verbindet einen TransformStream
, während pipeTo()
den endgültigen ReadableStream
mit einem WritableStream
verbindet.
Anwendungsfälle
Die Web Streams API in Node.js ist besonders leistungsfähig für:
- Dateiverarbeitung: Lesen und Schreiben großer Dateien, Chunk für Chunk, wie die Verarbeitung von CSVs, Logs oder Multigigabyte-Archiven, ohne den Speicher zu erschöpfen.
- Netzwerk-Proxys/Load Balancer: Effizientes Weiterleiten von Daten zwischen Client und Server, ohne die gesamte Anfrage/Antwort zu puffern.
- Echtzeit-Datenverarbeitung: Verarbeiten von eingehenden Daten von WebSockets oder Message Queues, Durchführen von Transformationen und anschließendes Weiterleiten an nachgelagerte Dienste.
- Datenkomprimierung/Verschlüsselung: Implementierung von Stream-basierter Komprimierung (z.B. Gzip) oder Verschlüsselung direkt in der Datenpipeline.
- API-Pipelining: Verketten mehrerer API-Aufrufe, bei denen die Ausgabe der einen als Eingabe für die nächste dient, während Daten gestreamt werden.
Betrachten Sie zum Beispiel ein Szenario, in dem Sie eine große CSV-Datei herunterladen, deren Inhalt filtern und dann die gefilterten Daten an einen anderen Dienst hochladen. Ein Stream-basierter Ansatz würde wie folgt aussehen:
import { ReadableStream, WritableStream, TransformStream } from 'node:stream/web'; import { createReadStream, createWriteStream } from 'node:fs'; import { pipeline } from 'node:stream/promises'; // Für native Node.js Streams // Beispiel: Zeilen aus einer großen Datei filtern class LineFilterTransform extends TransformStream { constructor(keyword) { let buffer = ''; super({ transform(chunk, controller) { buffer += new TextDecoder().decode(chunk); const lines = buffer.split('\n'); buffer = lines.pop(); // Behält die letzte (unvollständige) Zeile for (const line of lines) { if (line.includes(keyword)) { controller.enqueue(new TextEncoder().encode(line + '\n')); } } }, flush(controller) { if (buffer.length > 0 && buffer.includes(keyword)) { controller.enqueue(new TextEncoder().encode(buffer + '\n')); } } }); } } // Eine Dummy-Großdatei zur Demonstration erstellen // fs.writeFileSync('large_data.txt', Array(1000).fill('Hello world\nAnother line with Node.js\n').join('')); async function processLargeFile() { const inputFile = createReadStream('large_data.txt'); // Natives Node.js Readable Stream const outputFile = createWriteStream('filtered_data.txt'); // Natives Node.js Writable Stream const webReadable = ReadableStream.from(inputFile); // Nativ zu Web Stream konvertieren const webWritable = new WritableStream({ async write(chunk) { outputFile.write(chunk); }, close() { outputFile.end(); }, abort(reason) { outputFile.destroy(reason); } }); const filterStream = new LineFilterTransform('Node.js'); console.log('--- Große Datei verarbeiten ---'); await webReadable .pipeThrough(filterStream) .pipeTo(webWritable); console.log('--- Dateiverarbeitung abgeschlossen ---'); console.log('Gefilterte Daten wurden in filtered_data.txt geschrieben'); } // Stellen Sie sicher, dass 'large_data.txt' mit einigen Inhalten vorhanden ist, bevor Sie ausführen // Zum Testen können Sie es erstellen mit: // require('fs').writeFileSync('large_data.txt', Array(1000).fill('Hello world\nAnother line with Node.js\n').join('')); processLargeFile().catch(console.error);
Dieses Beispiel zeigt, wie native Node.js Streams mit Web Streams mithilfe von ReadableStream.from()
und einem benutzerdefinierten WritableStream
-Adapter überbrückt werden können, was leistungsstarke Dateiverarbeitungspipelines ermöglicht.
Fazit
Die Web Streams API bietet einen modernen, effizienten und standardisierten Ansatz zur Handhabung von Datenströmen in JavaScript und bringt erhebliche Vorteile für Node.js-Anwendungen. Durch die Nutzung von ReadableStream
, WritableStream
und TransformStream
können Entwickler robuste, speichereffiziente und hochgradig zusammensetzbare Datenpipelines erstellen, die den Informationsfluss elegant verwalten. Die Nutzung der Web Streams API eröffnet ein neues Niveau an Leistung und architektonischer Klarheit für die asynchrone Datenverarbeitung in Node.js.