Lesen Sie, wie Sie lesbare, beschreibbare und transformierte Streams mit der Streams API verwenden.
Mit der Streams API können Sie programmatisch auf Datenstreams zugreifen, die über das Netzwerk empfangen wurden.
oder auf irgendeine Weise lokal
erstellt werden,
mit JavaScript verarbeiten können. Beim Streaming wird eine Ressource aufgegliedert, die Sie empfangen, senden oder transformieren möchten.
in kleine Blöcke und verarbeitet sie dann Stück für Stück. Beim Streaming geht es darum,
wenn sie Assets wie HTML oder Videos empfangen, die auf Webseiten angezeigt werden sollen,
Die Funktion war für JavaScript erst verfügbar, nachdem fetch
mit Streams 2015 eingeführt wurde.
Wenn Sie früher eine Ressource (z. B. ein Video, eine Textdatei usw.) verarbeiten wollten, müssten Sie die gesamte Datei herunterladen, warten, bis sie in ein geeignetes Format deserialisiert wurde, und dann zu verarbeiten. Streams sind für alle JavaScript ausführen, ändert sich alles. Sie können Rohdaten mit JavaScript schrittweise verarbeiten, sobald sie auf dem Client verfügbar sind, ohne einen Puffer, einen String oder ein Blob generieren zu müssen. Dadurch ergeben sich für Sie eine Reihe von Anwendungsfällen, von denen ich einige unten auflistee:
- Videoeffekte:Wird ein lesbarer Videostream durch einen Transformationsstream geleitet, bei dem Effekte angewendet werden. in Echtzeit.
- Daten- bzw. De-komprimierung:Weiterleitung eines Dateistreams durch einen Transformationsstream, der selektiv (de)komprimiert.
- Bilddecodierung:Weiterleitung eines HTTP-Antwortstreams durch einen Transformationsstream, der Byte decodiert
in Bitmapdaten und dann über einen weiteren Transformationsstream, der Bitmaps in PNGs umwandelt. Wenn
die im
fetch
-Handler eines Service Workers installiert sind, können Sie neue Bildformate wie AVIF.
Unterstützte Browser
ReadableStream und WritableStream
Unterstützte Browser
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
TransformStream
Unterstützte Browser
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
Wichtige Konzepte
Bevor ich die verschiedenen Arten von Streams näher beschreibe, möchte ich einige Kernkonzepte erläutern.
Klötze
Ein Chunk ist ein einzelnes Datenelement, das in einen Stream geschrieben oder aus diesem gelesen wird. Dabei kann es sich um
Typ; Streams können sogar Blöcke verschiedener Typen enthalten. Meistens ist ein Chunk nicht besonders atomar
Dateneinheit für einen bestimmten Stream. Ein Bytestream kann beispielsweise Blöcke enthalten, die aus 16
KiB-Uint8Array
-Einheiten anstelle von einzelnen Byte.
Lesbare Streams
Ein lesbarer Stream stellt eine Datenquelle dar, aus der gelesen werden kann. Mit anderen Worten, Daten kommen
aus einem lesbaren Stream. Konkret ist ein lesbarer Stream eine Instanz von ReadableStream
.
.
Beschreibbare Streams
Ein beschreibbarer Stream stellt ein Ziel für Daten dar, in das geschrieben werden kann. Mit anderen Worten, Daten
wird in einen schreibbaren Stream geladen. Ein beschreibbarer Stream
ist eine Instanz des
Klasse WritableStream
.
Streams transformieren
Ein Transformationsstream besteht aus einem Paar von Streams: einem beschreibbaren Stream, der auch als beschreibbare Seite bezeichnet wird.
und einem lesbaren Stream, der als lesbare Seite bezeichnet wird.
Eine Metapher hierfür wäre
Simultandolmetscher
der spontan von einer Sprache in eine andere übersetzt.
In einer für den Transformationsstream spezifischen Weise
führt dazu, dass neue Daten zum Lesen
Seite gut lesbar ist. Genauer gesagt kann jedes Objekt mit einer writable
- und readable
-Eigenschaft
als Transformationsstream. Mit der Standardklasse TransformStream
ist es jedoch einfacher,
ein sogenanntes Paar.
Rohrketten
Streams werden in erster Linie verwendet, indem sie über Pipes aneinander gesendet werden. An einen lesbaren Stream kann direkt weitergeleitet werden
mit der Methode pipeTo()
des lesbaren Streams in einen schreibbaren Stream übertragen oder durch eine Pipeline an einen schreibbaren Stream geleitet werden.
oder mehr Transformationsstreams zuerst mit der Methode pipeThrough()
des lesbaren Streams. Eine Reihe von
auf diese Weise miteinander verbundene Ströme, wird als Rohrkette bezeichnet.
Rückdruck
Sobald eine Pipe-Kette erstellt ist, leitet sie Signale weiter, die angeben, wie schnell Blöcke fließen sollen. durch sie hindurch. Wenn ein Schritt in der Kette noch keine Blöcke akzeptieren kann, wird das Signal rückwärts weitergeleitet. durch die Rohrkette verlaufen, bis die ursprüngliche Quelle schließlich aufgefordert wird, die Produktion von Stücken einzustellen, schnell. Dieser Prozess der Normalisierung des Flusses wird als Rückdruck bezeichnet.
Abschlag
Ein lesbarer Stream kann mithilfe der Methode tee()
über ein T-Shirt verknüpft werden, das nach der Form eines Großbuchstabens „T“ benannt ist.
Dadurch wird der Stream gesperrt, er kann also nicht mehr direkt verwendet werden. Es werden jedoch zwei neue
Streams, sogenannte Zweige, die unabhängig verarbeitet werden können.
Teeing ist auch wichtig, weil Streams nicht zurückgespult oder neu gestartet werden können. Mehr dazu später.
Funktionsweise eines lesbaren Streams
Ein lesbarer Stream ist eine Datenquelle, die in JavaScript durch ein
ReadableStream
-Objekt, das
aus einer zugrunde liegenden Quelle. Die
ReadableStream()
-Konstruktor erstellt und gibt ein lesbares Streamobjekt aus den angegebenen Handlern zurück. Es gibt zwei
Typen der zugrunde liegenden Quelle:
- Push-Quellen senden ständig Daten an Sie, wenn Sie darauf zugreifen, und es liegt an Ihnen, dies zu tun den Zugriff auf den Stream starten, pausieren oder abbrechen. Beispiele hierfür sind Live-Videostreams, vom Server gesendete Ereignisse, oder WebSockets.
- Bei Pull-Quellen müssen Sie explizit Daten von ihnen anfordern, sobald eine Verbindung besteht. Beispiele
HTTP-Vorgänge über
fetch()
- oderXMLHttpRequest
-Aufrufe einschließen.
Streamdaten werden sequenziell in kleinen Abschnitten gelesen, die als Blöcke bezeichnet werden. Die in einem Stream platzierten Blöcke gelten als in die Warteschlange. Das bedeutet, dass sie in einer Warteschlange warten. und kann gelesen werden. Eine interne Warteschlange verfolgt die Blöcke, die noch nicht gelesen wurden.
Eine Warteschlangenstrategie ist ein Objekt, das festlegt, wie ein Stream den Gegendruck auf der Grundlage den Status seiner internen Warteschlange. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht Gesamtgröße aller Blöcke in der Warteschlange auf eine angegebene Zahl, die als Hochwassermarkierung bezeichnet wird.
Die Blöcke im Stream werden von einem Leser gelesen. Dieser Reader ruft die Daten Stück für Stück sodass Sie genau die Art von Operation ausführen können, die Sie damit durchführen möchten. Der Leser und die anderen zugehörigen Verarbeitungscode wird als Consumer bezeichnet.
Das nächste Konstrukt wird in diesem Zusammenhang als Controller bezeichnet. Jedem lesbaren Stream ist ein Controller, mit dem du, wie der Name schon sagt, den Stream steuern kannst.
Ein Stream kann jeweils nur von einem Leser gelesen werden. Ein Leser wird erstellt und beginnt, einen Stream zu lesen. also zu einem aktiven Leser wird, ist er gesperrt. Wenn Sie möchten, dass ein anderer Leser Wenn Sie Ihren Stream lesen, müssen Sie in der Regel das erste Lesegerät freigeben, bevor Sie etwas anderes tun. (Du kannst aber einen Tee mit Stream starten).
Lesbaren Stream erstellen
Sie erstellen einen lesbaren Stream, indem Sie dessen Konstruktor aufrufen.
ReadableStream()
Der Konstruktor hat das optionale Argument underlyingSource
, das ein Objekt darstellt.
mit Methoden und Attributen, die das Verhalten der erstellten Streaminstanz definieren.
underlyingSource
Dazu können die folgenden optionalen, vom Entwickler definierten Methoden verwendet werden:
start(controller)
: Wird sofort bei der Erstellung des Objekts aufgerufen. Die kann auf die Streamquelle zugreifen und weitere Aktionen ausführen. die für die Einrichtung der Streamfunktion erforderlich sind. Wenn dieser Vorgang asynchron durchgeführt werden soll, kann die Methode ein Versprechen zurückgeben, das Erfolg oder Misserfolg signalisiert. Der an diese Methode übergebenecontroller
-Parameter ist eineReadableStreamDefaultController
pull(controller)
: Kann verwendet werden, um den Stream zu steuern, wenn mehr Blöcke abgerufen werden. Es wird wiederholt aufgerufen, solange die interne Warteschlange des Streams mit Chunks nicht voll ist, bis die Warteschlange erreicht sein Hochwasser. Wenn das Ergebnis des Aufrufs vonpull()
ein Promise ist,pull()
wird erst dann wieder angerufen, wenn das Versprechen erfüllt ist. Wenn das Versprechen abgelehnt wird, wird für den Stream ein Fehler ausgegeben.cancel(reason)
: Wird aufgerufen, wenn der Stream-Nutzer den Stream abbricht.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
unterstützt die folgenden Methoden:
ReadableStreamDefaultController.close()
schließt den zugehörigen Stream.ReadableStreamDefaultController.enqueue()
stellt einen bestimmten Block in den zugehörigen Stream ein.ReadableStreamDefaultController.error()
führt dazu, dass zukünftige Interaktionen mit dem zugehörigen Stream fehlerhaft sind.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Das zweite, ebenfalls optionale Argument des ReadableStream()
-Konstruktors ist queuingStrategy
.
Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert, die zwei
Parameter:
highWaterMark
: Eine nicht negative Zahl, die das Hochwasser des Flusses angibt, für das diese Warteschlangenstrategie verwendet wird.size(chunk)
: Eine Funktion, die die endliche nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechendeReadableStreamDefaultController.desiredSize
-Attribut zeigt. Sie bestimmt auch, wann die Methodepull()
der zugrunde liegenden Quelle aufgerufen wird.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Die Methoden getReader()
und read()
Zum Lesen aus einem lesbaren Stream benötigen Sie einen Reader.
ReadableStreamDefaultReader
Die Methode getReader()
der ReadableStream
-Schnittstelle erstellt einen Leser und sperrt den Stream für
. Solange der Stream gesperrt ist, kann kein weiterer Leser erworben werden, bis dieser übertragen wird.
Die read()
der ReadableStreamDefaultReader
-Schnittstelle ein Promise zurückgibt, das Zugriff auf das nächste
in die interne Warteschlange des Streams ein. Sie erfüllt oder lehnt ein Ergebnis ab, je nachdem,
in den Stream aufnehmen. Es gibt folgende Möglichkeiten:
- Wenn ein Chunk verfügbar ist, wird das Versprechen mit einem Objekt der Form erfüllt
{ value: chunk, done: false }
. - Wenn der Stream geschlossen wird, wird das Versprechen mit einem Objekt im folgenden Format erfüllt:
{ value: undefined, done: true }
. - Wenn beim Stream ein Fehler auftritt, wird das Promise mit dem entsprechenden Fehler abgelehnt.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Das Attribut locked
Sie können überprüfen, ob ein lesbarer Stream gesperrt ist, indem Sie auf die
ReadableStream.locked
Property.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Lesbare Stream-Codebeispiele
Das Codebeispiel unten zeigt alle Schritte in Aktion. Sie erstellen zuerst ein ReadableStream
, das in seiner
Argument underlyingSource
(d. h. die Klasse TimestampSource
) definiert eine start()
-Methode.
Mit dieser Methode wird dem controller
des Streams mitgeteilt,
enqueue()
während zehn Sekunden pro Sekunde einen Zeitstempel.
Schließlich wird der Controller angewiesen, den Stream close()
. Sie konsumieren das
indem du mit der Methode getReader()
einen Leser erstellst und read()
aufrufst, bis der Stream
done
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Asynchrone Iteration
Die Prüfung bei jeder read()
-Schleifeniteration, wenn der Stream done
ist, ist möglicherweise nicht die bequemste API.
Glücklicherweise gibt es dazu bald eine bessere Methode: die asynchrone Iteration.
for await (const chunk of stream) {
console.log(chunk);
}
Eine Problemumgehung für die aktuelle Verwendung der asynchronen Iteration besteht darin, das Verhalten mit einem Polyfill zu implementieren.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Teeing eines lesbaren Streams
Die Methode tee()
des
Die ReadableStream
-Schnittstelle verknüpft den aktuell lesbaren Stream und gibt ein Array aus zwei Elementen zurück.
die die beiden resultierenden Zweige als neue ReadableStream
-Instanzen enthält. Dadurch können Sie
zwei Lesern, um einen Stream gleichzeitig zu lesen. Sie können dies beispielsweise
in einem Service Worker tun,
eine Antwort vom Server abrufen und an den Browser streamen, aber auch an den
Service Worker-Cache. Da ein Antworttext nicht mehr als einmal verarbeitet werden kann, benötigen Sie zwei Kopien
um dies zu tun. Zum Abbrechen des Streams müssen Sie dann beide resultierenden Zweige abbrechen. Stream starten
wird sie in der Regel für die Dauer gesperrt, sodass andere Leser sie nicht sperren können.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Lesbare Bytestreams
Für Streams, die Byte darstellen, wird eine erweiterte Version des lesbaren Streams bereitgestellt, um Bytes effizient reduzieren, insbesondere durch das Minimieren von Kopien. Byte-Streams ermöglichen Zwischenspeichern (Bring-Your-Own-Zwischenspeicher) BYOB-Leser zu gewinnen. Die Standardimplementierung kann eine Reihe verschiedener Ausgaben liefern, z. B. im Fall von WebSockets als Zeichenfolgen oder Array-Zwischenspeicher, wohingegen Byte-Streams die Byte-Ausgabe garantieren. Darüber hinaus bieten BYOB-Lesegeräte Stabilitätsvorteile. Dies ist denn wenn ein Zwischenspeicher getrennt wird, kann er nicht zweimal in denselben Zwischenspeicher schreiben. wodurch Race-Bedingungen vermieden werden. BYOB-Lesegeräte können die Ausführungshäufigkeit des Browsers reduzieren automatische Speicherbereinigung, da Puffer wiederverwendet werden können.
Einen lesbaren Bytestream erstellen
Sie können einen lesbaren Bytestream erstellen, indem Sie einen zusätzlichen type
-Parameter an den
ReadableStream()
-Konstruktor.
new ReadableStream({ type: 'bytes' });
underlyingSource
Die zugrunde liegende Quelle eines lesbaren Bytestreams wird mit einem ReadableByteStreamController
zu manipulieren. Die ReadableByteStreamController.enqueue()
-Methode verwendet ein chunk
-Argument, dessen Wert
ist ein ArrayBufferView
. Das Attribut ReadableByteStreamController.byobRequest
gibt den aktuellen Wert
BYOB-Pull-Anfrage oder null, wenn keine vorhanden ist. Schließlich wird der ReadableByteStreamController.desiredSize
gibt die gewünschte Größe zurück, um die interne Warteschlange des kontrollierten Streams zu füllen.
queuingStrategy
Das zweite, ebenfalls optionale Argument des ReadableStream()
-Konstruktors ist queuingStrategy
.
Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert, wofür eine
Parameter:
highWaterMark
: Eine nicht negative Anzahl von Byte, die das Hochwasser des Streams mit dieser Warteschlangenstrategie angibt. Er wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechendeReadableByteStreamController.desiredSize
-Attribut auswirkt. Sie bestimmt auch, wann die Methodepull()
der zugrunde liegenden Quelle aufgerufen wird.
Die Methoden getReader()
und read()
Sie können dann Zugriff auf ein ReadableStreamBYOBReader
erhalten, indem Sie den mode
-Parameter entsprechend festlegen:
ReadableStream.getReader({ mode: "byob" })
Dies ermöglicht eine präzisere Kontrolle über den Zwischenspeicher.
um Kopien zu vermeiden. Um aus dem Bytestream zu lesen, müssen Sie
ReadableStreamBYOBReader.read(view)
, wobei view
ein
ArrayBufferView
Lesbares Bytestream-Codebeispiel
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Die folgende Funktion gibt lesbare Bytestreams zurück, die ein effizientes Lesen eines ein zufällig generiertes Array. Anstatt eine vorgegebene Chunk-Größe von 1.024 zu verwenden, des vom Entwickler bereitgestellten Zwischenspeichers und ermöglicht so volle Kontrolle.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mechanismen eines beschreibbaren Streams
Ein beschreibbarer Stream ist ein Ziel, in das Sie Daten schreiben können. Diese werden in JavaScript durch einen
WritableStream
-Objekt. Dieses
dient als Abstraktion über einer zugrunde liegenden Senke – einer untergeordneten E/A-Senke, in die
Rohdaten geschrieben werden.
Die Daten werden Block für Block über einen writer in den Stream geschrieben. Ein Chunk kann Formen, wie die Blöcke in einem Reader. Sie können jeden beliebigen Code verwenden, die Blöcke bereit zum Schreiben; Der Autor und der zugehörige Code werden als Producer bezeichnet.
Wenn ein Autor erstellt wird und anfängt, in einen Stream zu schreiben (ein aktiver Autor), gilt dies als nicht darauf zugreifen. Es kann jeweils nur ein Schreiber in einen beschreibbaren Stream schreiben. Wenn Sie eine weitere Writer, um in Ihren Stream zu schreiben, müssen Sie ihn in der Regel freigeben, bevor Sie ihn anhängen. mit einem anderen Autor.
Eine interne Warteschlange verfolgt die Blöcke, die in den Stream geschrieben wurden, aber noch nicht. von der zugrunde liegenden Senke verarbeitet.
Eine Warteschlangenstrategie ist ein Objekt, das festlegt, wie ein Stream den Gegendruck auf der Grundlage den Status seiner internen Warteschlange. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht Gesamtgröße aller Blöcke in der Warteschlange auf eine angegebene Zahl, die als Hochwassermarkierung bezeichnet wird.
Das endgültige Konstrukt wird als Controller bezeichnet. Jedem beschreibbaren Stream ist ein Controller zugeordnet, der ermöglicht dir das Steuern des Streams (z. B., um ihn abzubrechen).
Beschreibbaren Stream erstellen
Die WritableStream
-Oberfläche von
bietet die Streams API eine Standardabstraktion zum Schreiben von Streamingdaten in ein Ziel.
als Senke. Dieses Objekt verfügt über einen integrierten Rückdruck und eine Wiedergabeliste. Sie erstellen einen beschreibbaren Stream, indem Sie
durch Aufrufen seines Konstruktors
WritableStream()
Sie hat einen optionalen underlyingSink
-Parameter, der ein Objekt darstellt.
mit Methoden und Attributen, die das Verhalten der erstellten Streaminstanz definieren.
underlyingSink
underlyingSink
kann die folgenden optionalen, vom Entwickler definierten Methoden enthalten. Das controller
-Parameter, der an einige der Methoden übergeben wird,
WritableStreamDefaultController
start(controller)
: Diese Methode wird sofort aufgerufen, wenn das Objekt erstellt wird. Die Der Inhalt dieser Methode sollte darauf abzielen, Zugriff auf die zugrunde liegende Senke zu erhalten. Wenn dieser Prozess asynchron ausgeführt wird, kann ein Promise zurückgegeben werden, das Erfolg oder Misserfolg signalisiert.write(chunk, controller)
: Diese Methode wird aufgerufen, wenn ein neuer Datenblock (angegeben in denchunk
) kann in die zugrunde liegende Senke geschrieben werden. Sie kann ein Versprechen zurückgeben, signalisiert den Erfolg oder Misserfolg des Schreibvorgangs. Diese Methode wird erst nach der vorherigen Schreibvorgänge waren erfolgreich und nie, nachdem der Stream geschlossen oder abgebrochen wurde.close(controller)
: Diese Methode wird aufgerufen, wenn die App signalisiert, dass der Schreibvorgang abgeschlossen ist. an den Stream zu senden. Der Inhalt sollte alles tun, was notwendig ist, um Schreibvorgänge in den der zugrunde liegenden Senke und geben den Zugriff darauf frei. Wenn dieser Prozess asynchron ist, kann er einen Erfolg oder Misserfolg zu signalisieren. Diese Methode wird erst aufgerufen, nachdem alle Schreibvorgänge in der Warteschlange erfolgreich waren.abort(reason)
: Diese Methode wird aufgerufen, wenn die App signalisiert, dass sie abrupt geschlossen werden soll. in einen fehlerhaften Zustand versetzt werden. Damit können alle zurückgehaltenen Ressourcen bereinigt werden,close()
, aberabort()
wird aufgerufen, auch wenn Schreibvorgänge in die Warteschlange gestellt werden. Diese Blöcke werden dann weg sind. Wenn dieser Prozess asynchron ist, kann er ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Die Der Parameterreason
enthält einenDOMString
-Wert, der beschreibt, warum der Stream abgebrochen wurde.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Die
WritableStreamDefaultController
Schnittstelle der Streams API stellt einen Controller dar, über den der Status einer WritableStream
gesteuert werden kann
während der Einrichtung, da weitere Blöcke zum Schreiben eingereicht werden, oder am Ende des Textes. Beim Erstellen
eine WritableStream
ist, erhält die zugrunde liegende Senke eine entsprechende WritableStreamDefaultController
Instanz, die bearbeitet werden soll. WritableStreamDefaultController
hat nur eine Methode:
WritableStreamDefaultController.error()
,
Dadurch werden zukünftige
Interaktionen mit dem zugehörigen Stream fehlerhaft.
WritableStreamDefaultController
unterstützt auch ein signal
-Attribut, das eine Instanz von
AbortSignal
,
sodass ein WritableStream
-Vorgang bei Bedarf beendet werden kann.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Das zweite, ebenfalls optionale Argument des WritableStream()
-Konstruktors ist queuingStrategy
.
Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert, die zwei
Parameter:
highWaterMark
: Eine nicht negative Zahl, die das Hochwasser des Flusses angibt, für das diese Warteschlangenstrategie verwendet wird.size(chunk)
: Eine Funktion, die die endliche nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Das Ergebnis wird verwendet, um den Rückstand zu bestimmen, der sich über das entsprechendeWritableStreamDefaultWriter.desiredSize
-Attribut zeigt.
Die Methoden getWriter()
und write()
Zum Schreiben in einen beschreibbaren Stream benötigen Sie einen Writer. Dieser ist ein
WritableStreamDefaultWriter
Die Methode getWriter()
der WritableStream
-Schnittstelle gibt ein
neue Instanz von WritableStreamDefaultWriter
hinzu und sperrt den Stream auf diese Instanz. Während die
Stream gesperrt ist, kann kein anderer Writer übernommen werden, bis der aktuelle Stream veröffentlicht wird.
Die write()
der Methode
WritableStreamDefaultWriter
schreibt einen übergebenen Datenblock in einen WritableStream
und seine zugrunde liegende Senke und gibt dann
Ein Promise, das aufgelöst wird, um den Erfolg oder Misserfolg des Schreibvorgangs anzuzeigen. Beachten Sie, dass
„Erfolg“ liegt bei der zugrunde liegenden Senke; wird möglicherweise angezeigt,
dass der Block akzeptiert wurde.
und nicht unbedingt an ihrem endgültigen Ziel gespeichert sind.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Das Attribut locked
Sie können überprüfen, ob ein beschreibbarer Stream gesperrt ist, indem Sie auf seine
WritableStream.locked
Property.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Codebeispiel für beschreibbaren Stream
Das Codebeispiel unten zeigt alle Schritte in Aktion.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Einen lesbaren Stream in einen beschreibbaren Stream einbinden
Ein lesbarer Stream kann über die Pipeline
pipeTo()
-Methode.
ReadableStream.pipeTo()
leitet den aktuellen ReadableStream
an eine gegebene WritableStream
weiter und gibt einen
Versprechen, das sich erfüllt, wenn der Pipe-Prozess erfolgreich abgeschlossen wird, oder es ablehnt, wenn Fehler
gefunden wurden.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Transformationsstream erstellen
Die TransformStream
-Schnittstelle der Streams API stellt eine Reihe transformierbarer Daten dar. Ich
Erstellen Sie einen Transformationsstream, indem Sie seinen Konstruktor TransformStream()
aufrufen. Dieser erstellt dann
Ein Transformationsstreamobjekt aus den angegebenen Handlern. Der TransformStream()
-Konstruktor akzeptiert als
Das erste Argument ist ein optionales JavaScript-Objekt, das die transformer
darstellt. Solche Objekte können
eine der folgenden Methoden enthalten:
transformer
start(controller)
: Diese Methode wird sofort aufgerufen, wenn das Objekt erstellt wird. Normalerweise wird verwendet, um Präfixblöcke mitcontroller.enqueue()
in die Warteschlange zu stellen. Diese Blöcke werden gelesen lesbar sein, sind jedoch nicht von Schreibvorgängen auf die beschreibbare Seite abhängig. Wenn diese Initiale ist asynchron, z. B. weil das Abrufen der Präfix-Blöcke Die Funktion kann ein Versprechen zurückgeben, um Erfolg oder Misserfolg zu signalisieren. wird ein abgelehntes Promise den Fehler . Alle ausgelösten Ausnahmen werden vomTransformStream()
-Konstruktor noch einmal ausgelöst.transform(chunk, controller)
: Diese Methode wird aufgerufen, wenn ein neuer Block geschrieben wurde, der ursprünglich in den beschreibbar ist, kann transformiert werden. Die Stream-Implementierung garantiert, dass diese Funktion wird erst aufgerufen, nachdem vorherige Transformationen erfolgreich waren, und nie vorstart()
abgeschlossen wurde oder nachdemflush()
aufgerufen wurde. Diese Funktion führt die eigentliche Transformation des Transformationsstreams. Die Ergebnisse können mitcontroller.enqueue()
in die Warteschlange eingereiht werden. Dieses ermöglicht es einem einzelnen Block, der auf die beschreibbare Seite geschrieben wird, zu null oder mehreren Blöcken auf der Seite lesbar ist, je nachdem, wie oftcontroller.enqueue()
aufgerufen wird. Wenn das Verfahren erfolgt die Transformation asynchron, kann diese Funktion ein Versprechen zurückgeben, der Transformation. Ein abgelehntes Promise führt sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Transform-Stream. Ist die Methodetransform()
nicht angegeben, wird die Identitätstransformation verwendet, die stellt Blöcke unverändert von der beschreibbaren Seite zur lesbaren Seite in die Warteschlange.flush(controller)
: Diese Methode wird aufgerufen, nachdem alle auf die beschreibbaren Seite geschriebenen Blöcke abgeschlossen sind. durch erfolgreich durchlaufenetransform()
transformiert. Die beschreibbare Seite wird gleich geschlossen. In der Regel wird dies verwendet, um Suffix-Chunks auf die lesbare Seite zu stellen, bevor diese ebenfalls in die Warteschlange gestellt wird geschlossen wird. Wenn der Leerungsprozess asynchron ist, kann die Funktion ein Promise an Erfolg oder Misserfolg signalisieren; wird das Ergebnis an den Aufruferstream.writable.write()
Außerdem führt ein abgelehntes Promise beschreibbaren Seiten des Streams. Das Auslösen einer Ausnahme wird wie das Zurückgeben einer abgelehnten versprochen.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Die Warteschlangenstrategien writableStrategy
und readableStrategy
Der zweite und dritte optionale Parameter des TransformStream()
-Konstruktors sind optional
Warteschlangenstrategien writableStrategy
und readableStrategy
. Sie sind definiert wie in der
readable und writable
-Abschnitte.
Codebeispiel für Transformationsstream
Das folgende Codebeispiel zeigt einen einfachen Transformationsstream in Aktion.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Einen lesbaren Stream durch einen Transformationsstream leiten
Die pipeThrough()
-Methode der ReadableStream
-Schnittstelle bietet eine verkettbare Methode für das Piping des aktuellen Streams.
Transformationsstream oder ein anderes beschreibbar/lesbares Paar. Das Piepton eines Streams wird in der Regel gesperrt
für die Dauer der Pipe, sodass andere Lesegeräte sie nicht sperren können.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Im nächsten, etwas konstruierten Codebeispiel, Version von fetch()
der den gesamten Text großgeschrieben, indem das zurückgegebene Antwortversprechen verwendet wird.
als Stream
und schreiben Sie Block für Block in Großbuchstaben. Der Vorteil dieses Ansatzes ist, dass Sie nicht auf
das gesamte Dokument herunterzuladen, was bei großen Dateien einen großen Unterschied machen kann.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Die folgende Demo zeigt lesbare, beschreibbare und transformierbare Streams in Aktion. Sie enthält auch Beispiele
pipeThrough()
und pipeTo()
und zeigt ebenfalls tee()
. Sie können optional
Demo in einem eigenen Fenster öffnen oder
Quellcode.
Nützliche Streams im Browser
Zahlreiche nützliche Streams sind direkt in den Browser integriert. Sie können ganz einfach ein
ReadableStream
aus einem Blob. Die Blob
stream() der Schnittstelle gibt
Einen ReadableStream
, der nach dem Lesen die im Blob enthaltenen Daten zurückgibt. Denken Sie auch daran,
File
-Objekt ist eine bestimmte Art von
Blob
und können in jedem Kontext verwendet werden, den ein Blob hat.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Die Streamingvarianten von TextDecoder.decode()
und TextEncoder.encode()
werden aufgerufen
TextDecoderStream
und
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Mit der Funktion
CompressionStream
und
DecompressionStream
-Transformationsstreams
. Das folgende Codebeispiel zeigt, wie Sie die Streams-Spezifikation herunterladen und mit gzip komprimieren können.
direkt im Browser und schreiben die komprimierte Datei direkt auf die Festplatte.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Die File System Access API
FileSystemWritableFileStream
Die experimentellen fetch()
-Anfragestreams sind
Beispiele für beschreibbare Ströme in freier Wildbahn.
Die Serial API nutzt sowohl gut lesbare als auch schreibbare Streams intensiv.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Schließlich integriert die WebSocketStream
API Streams in die WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Nützliches Infomaterial
- Streams-Spezifikation
- Begleitende Demos
- Polyfill für Streams
- 2016 – das Jahr der Webstreams
- Asynchrone Iteratoren und Generatoren
- Stream Visualizer
Danksagungen
Artikel wurde geprüft von Jake Archibald François Beaufort Sam Dutton Mattias Buelens Surma, Joe Medley und Adam Rice. Die Blogposts von Jake Archibald helfen mir sehr dabei, Streams. Einige Codebeispiele sind vom GitHub-Nutzer inspiriert. @bellbind explorative Datenanalysen und Teile des Textes auf dem MDN-Webdokumente auf Streams Die Streams Standard Autoren haben bei der Erstellung und diese Spezifikation schreiben. Hero-Image von Ryan Lara auf Entspritzen.