Aprende a usar transmisiones legibles, de escritura y de transformación con la API de Streams.
La API de Streams te permite acceder de manera programática a flujos de datos recibidos a través de la red o creados por cualquier medio de forma local y procesarlos con JavaScript. La transmisión implica desglosar un recurso que deseas recibir, enviar o transformar en fragmentos pequeños y, luego, procesarlos poco a poco. Si bien la transmisión es algo que los navegadores hacen de todas formas cuando reciben elementos como HTML o videos para mostrar en las páginas web, esta función nunca estuvo disponible para JavaScript antes de que se presentara la fetch
con transmisiones en 2015.
Antes, si querías procesar algún tipo de recurso (ya sea un video, un archivo de texto, etc.), tenías que descargar el archivo completo, esperar a que se deserialice en un formato adecuado y, luego, procesarlo. Como las transmisiones están disponibles para JavaScript, todo esto cambia. Ahora puedes procesar datos sin procesar con JavaScript de manera progresiva en cuanto estén disponibles en el cliente, sin necesidad de generar un búfer, una cadena o un BLOB. Esto desbloquea una serie de casos de uso, algunos de los cuales se enumeran a continuación:
- Efectos de video: Canalización de una transmisión de video legible a través de una transmisión de transformación que aplica efectos en tiempo real
- Descompresión (des)compresión de datos: Canalización de una transmisión de archivos a través de una transmisión de transformación que la descomprime de forma selectiva.
- Decodificación de imágenes: Canalización de una transmisión de respuesta HTTP a través de una transmisión de transformación que decodifica bytes en datos de mapa de bits y, luego, a través de otra transmisión de transformación que traduce mapas de bits a PNG. Si se instala dentro del controlador
fetch
de un service worker, puedes usar polyfills nuevos en formatos de imagen de manera transparente, como AVIF.
Navegadores compatibles
ReadableStream y WritableStream
TransformStream
Conceptos básicos
Antes de entrar en detalles sobre los distintos tipos de transmisiones, presentaré algunos conceptos básicos.
En trozos
Un fragmento es un dato único que se escribe en una transmisión o se lee desde ella. Pueden ser de cualquier tipo; las transmisiones incluso pueden contener fragmentos de diferentes tipos. La mayoría de las veces, un fragmento no será la unidad de datos más atómica de un flujo determinado. Por ejemplo, un flujo de bytes podría contener fragmentos que constan de unidades Uint8Array
de 16 KiB, en lugar de bytes individuales.
Transmisiones legibles
Una transmisión legible representa una fuente de datos desde la que puedes leer. En otras palabras, los datos provienen de un flujo legible. En concreto, una transmisión legible es una instancia de la clase ReadableStream
.
Transmisiones con capacidad de escritura
Una transmisión con capacidad de escritura representa un destino para datos en los que puedes escribir. En otras palabras, los datos entran a un flujo que admite escritura. En concreto, una transmisión que admite escritura es una instancia de la clase WritableStream
.
Transformar transmisiones
Una transmisión de transformación consta de un par de transmisiones: una transmisión que admite escritura, conocida como su lado que admite escritura, y una transmisión legible, conocida como su lado legible.
Una metáfora del mundo real para esto sería un intérprete simultáneo que traduce de un idioma a otro sobre la marcha.
De una manera específica para la transmisión de transformación, escribir en el lado que admite escritura hace que los datos nuevos estén disponibles para leer desde el lado legible. En concreto, cualquier objeto con una propiedad writable
y una propiedad readable
puede servir como un flujo de transformaciones. Sin embargo, la clase TransformStream
estándar facilita la creación de este par que se enreda correctamente.
Cadenas de tuberías
Principalmente, las transmisiones se canalizan entre sí. Una transmisión legible se puede canalizar directamente a una transmisión con escritura mediante el método pipeTo()
de la transmisión legible, o bien se puede canalizar primero a través de una o más transmisiones de transformación con el método pipeThrough()
de la transmisión legible. Un conjunto de transmisiones canalizadas juntas de esta manera se denomina cadena de canalización.
Contrapresión
Una vez que se crea una cadena de tuberías, se propagan indicadores respecto de qué tan rápido deben fluir los fragmentos a través de ella. Si algún paso de la cadena aún no puede aceptar fragmentos, propaga una señal hacia atrás a través de la cadena de tubo, hasta que finalmente se le dice a la fuente original que deje de producir fragmentos tan rápido. Este proceso de normalización del flujo se denomina contrapresión.
Golpe inicial
Se puede agregar una transmisión legible (que lleva su nombre según la forma de una "T" mayúscula) con su método tee()
.
Esto bloqueará la transmisión, es decir, dejará de usarla directamente; sin embargo, creará dos transmisiones nuevas, llamadas ramas, que se pueden consumir de forma independiente.
El inicio de sesión también es importante, ya que las transmisiones no se pueden retroceder ni reiniciar. Se trata de un tema más adelante.
La mecánica de una transmisión legible
Una transmisión legible es una fuente de datos representada en JavaScript por un objeto ReadableStream
que fluye desde una fuente subyacente. El constructor ReadableStream()
crea y muestra un objeto de transmisión legible de los controladores dados. Hay dos tipos de fuentes subyacentes:
- Las fuentes de envío te envían datos constantemente cuando accedes a ellas, y depende de ti iniciar, pausar o cancelar el acceso a la transmisión. Los ejemplos incluyen transmisiones de video en vivo, eventos enviados por el servidor o WebSockets.
- Las fuentes de extracción requieren que solicites datos de forma explícita una vez que estén conectadas. Algunos ejemplos incluyen operaciones HTTP a través de llamadas
fetch()
oXMLHttpRequest
.
Los datos de transmisión se leen de forma secuencial en partes pequeñas llamadas fragmentos. Se dice que los fragmentos que se colocan en una transmisión están en cola. Esto significa que están en una cola lista para leerla. Una cola interna realiza un seguimiento de los fragmentos que aún no se leyeron.
Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como la marca de agua alta.
Un lector lee los fragmentos dentro de la transmisión. Este lector recupera los datos de a un fragmento a la vez, lo que te permite realizar cualquier tipo de operación que desees en ellos. El lector y el otro código de procesamiento que lo acompaña se denominan consumidor.
La siguiente construcción en este contexto se denomina controlador. Cada transmisión legible tiene un controlador asociado que, como su nombre sugiere, te permite controlarla.
Solo un lector puede leer una transmisión a la vez; cuando se crea un lector y comienza a leer una transmisión (es decir, se convierte en un lector activo), se lo bloquea. Si quieres que otro lector se haga cargo de la lectura de tu transmisión, en general, debes liberar el primer lector antes de hacer cualquier otra cosa (aunque puedes aplicar las transmisiones).
Crea una transmisión legible
Para crear una transmisión legible, llama a su constructor ReadableStream()
.
El constructor tiene un argumento opcional underlyingSource
, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de transmisión construida.
underlyingSource
Esto puede usar los siguientes métodos opcionales definidos por el desarrollador:
start(controller)
: Se llama de inmediato cuando se construye el objeto. El método puede acceder a la fuente de la transmisión y realizar cualquier otra cosa necesaria para configurar la funcionalidad de la transmisión. Si este proceso se realiza de forma asíncrona, el método puede mostrar una promesa que indique el éxito o el fracaso. El parámetrocontroller
que se pasa a este método es unReadableStreamDefaultController
.pull(controller)
: Se puede usar para controlar la transmisión a medida que se recuperan más fragmentos. Se llama repetidamente mientras la cola interna de fragmentos de la transmisión no esté llena, hasta que la cola alcance su marca de agua alta. Si el resultado de la llamada apull()
es una promesa, no se volverá a llamar apull()
hasta que se cumpla esa promesa. Si se rechaza la promesa, se producirá un error en la transmisión.cancel(reason)
: Se llama cuando el consumidor de la transmisión la cancela.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
admite los siguientes métodos:
ReadableStreamDefaultController.close()
cierra la transmisión asociada.ReadableStreamDefaultController.enqueue()
coloca en cola un fragmento determinado en la transmisión asociada.ReadableStreamDefaultController.error()
provoca un error en cualquier interacción futura con la transmisión asociada.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
El segundo argumento del constructor ReadableStream()
, también opcional, es queuingStrategy
.
Es un objeto que define de forma opcional una estrategia de fila para la transmisión, que toma dos parámetros:
highWaterMark
: Es un número no negativo que indica la marca de agua más alta del flujo con esta estrategia de fila.size(chunk)
: Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento dado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedadReadableStreamDefaultController.desiredSize
adecuada. También controla cuándo se llama al métodopull()
de la fuente subyacente.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Los métodos getReader()
y read()
Para leer de una transmisión legible, necesitas un lector, que será un ReadableStreamDefaultReader
.
El método getReader()
de la interfaz ReadableStream
crea un lector y bloquea la transmisión en él. Mientras la transmisión está bloqueada, no se podrá adquirir otro lector hasta que se lance este.
El método read()
de la interfaz ReadableStreamDefaultReader
muestra una promesa que brinda acceso al siguiente fragmento de la cola interna de la transmisión. Se entrega o rechaza con un resultado según el estado de la transmisión. Las diferentes posibilidades son las siguientes:
- Si hay un fragmento disponible, la promesa se cumplirá con un objeto del formato
{ value: chunk, done: false }
. - Si se cierra la transmisión, la promesa se cumplirá con un objeto con el formato
{ value: undefined, done: true }
. - Si se produce un error en la transmisión, la promesa se rechaza con el error correspondiente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
La propiedad locked
Puedes verificar si una transmisión legible está bloqueada si accedes a su propiedad ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Muestras de código de transmisión legibles
En la siguiente muestra de código, se muestran todos los pasos en acción. Primero, crea un objeto ReadableStream
que, en su argumento underlyingSource
(es decir, la clase TimestampSource
) defina un método start()
.
Este método le indica al controller
de la transmisión que enqueue()
una marca de tiempo cada segundo durante diez segundos.
Por último, le indica al controlador que close()
la transmisión. Para consumir esta transmisión, crea un lector con el método getReader()
y llama a read()
hasta que la transmisión sea done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteración asíncrona
Es posible que verificar cada iteración de bucle read()
si la transmisión es done
no sea la API más conveniente.
Afortunadamente, en poco tiempo habrá una mejor manera de hacerlo: la iteración asíncrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una solución alternativa para usar la iteración asíncrona en la actualidad es implementar el comportamiento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Cómo crear un comienzo de una transmisión legible
El método tee()
de la interfaz ReadableStream
agrega la transmisión legible actual y muestra un array de dos elementos que contiene las dos ramas resultantes como instancias de ReadableStream
nuevas. Esto permite que dos lectores lean una transmisión simultáneamente. Puedes hacerlo, por ejemplo, en un service worker si
deseas recuperar una respuesta del servidor y transmitirla al navegador, pero también transmitirla a la
caché del service worker. Como el cuerpo de una respuesta no se puede consumir más de una vez, necesitas dos copias para hacerlo. Para cancelar la transmisión, debes cancelar las dos ramas resultantes. Por lo general, el inicio de una transmisión continua la bloqueará por el tiempo, lo que evita que otros lectores la bloqueen.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Flujos de bytes legibles
En el caso de las transmisiones que representan bytes, se proporciona una versión extendida de la transmisión legible para controlar los bytes de manera eficiente, en particular mediante la minimización de copias. Las transmisiones de bytes permiten adquirir lectores de tipo "trae tu propio búfer" (BYOB). La implementación predeterminada puede proporcionar un rango de resultados diferentes, como strings o búferes de array en el caso de WebSockets, mientras que las transmisiones de bytes garantizan la salida de bytes. Además, los lectores BYOB tienen beneficios de estabilidad. Esto se debe a que, si un búfer se desconecta, puede garantizar que no se escriba dos veces en el mismo búfer, lo que evita condiciones de carrera. Los lectores BYOB pueden reducir la cantidad de veces que el navegador necesita ejecutar la recolección de elementos no utilizados, ya que puede reutilizar búferes.
Cómo crear un flujo de bytes legible
Puedes crear un flujo de bytes legible pasando un parámetro type
adicional al constructor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
La fuente subyacente de un flujo de bytes legible recibe un ReadableByteStreamController
para manipular. Su método ReadableByteStreamController.enqueue()
toma un argumento chunk
cuyo valor es un ArrayBufferView
. La propiedad ReadableByteStreamController.byobRequest
muestra la solicitud de extracción BYOB actual, o nula si no hay ninguna. Por último, la propiedad ReadableByteStreamController.desiredSize
muestra el tamaño deseado para llenar la cola interna de la transmisión controlada.
queuingStrategy
El segundo argumento del constructor ReadableStream()
, también opcional, es queuingStrategy
.
Es un objeto que define de forma opcional una estrategia de cola para la transmisión, que toma un parámetro:
highWaterMark
: Es una cantidad no negativa de bytes que indica la marca de agua alta del flujo que usa esta estrategia de cola. Se utiliza para determinar la contrapresión, que se manifiesta a través de la propiedadReadableByteStreamController.desiredSize
adecuada. También controla cuándo se llama al métodopull()
de la fuente subyacente.
Los métodos getReader()
y read()
Luego, puedes obtener acceso a un ReadableStreamBYOBReader
si configuras el parámetro mode
según corresponda: ReadableStream.getReader({ mode: "byob" })
. Esto permite un control más preciso sobre la asignación del búfer para evitar copias. Para leer desde el flujo de bytes, debes llamar a ReadableStreamBYOBReader.read(view)
, donde view
es un ArrayBufferView
.
Muestra de código de flujo de bytes legible
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La siguiente función muestra flujos de bytes legibles que permiten una lectura eficiente de copia cero de un arreglo generado de forma aleatoria. En lugar de usar un tamaño de fragmento predeterminado de 1,024, intenta llenar el búfer proporcionado por el desarrollador, lo que permite un control total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
La mecánica de una transmisión con escritura
Una transmisión con capacidad de escritura es un destino en el que puedes escribir datos, representado en JavaScript por un objeto WritableStream
. Esto sirve como una abstracción sobre un receptor subyacente, que es un receptor de E/S de nivel inferior en el que se escriben datos sin procesar.
Los datos se escriben en el flujo a través de un escritor, un fragmento a la vez. Un fragmento puede tener muchas formas, como los fragmentos en un lector. Puedes usar el código que desees para producir los fragmentos listos para escribir; el escritor y el código asociado se llaman productor.
Cuando se crea un escritor y comienza a escribir en una transmisión (un escritor activo), se dice que está bloqueado a ella. Solo un escritor a la vez puede escribir en una transmisión con capacidad de escritura. Si quieres que otro escritor comience a escribir en tu transmisión, por lo general, debes liberarlo antes de adjuntarle otro.
Una cola interna realiza un seguimiento de los fragmentos que se escribieron en la transmisión, pero que el receptor subyacente todavía no los procesó.
Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como la marca de agua alta.
La construcción final se denomina controlador. Cada transmisión con capacidad de escritura tiene un controlador asociado que te permite controlarla (por ejemplo, anularla).
Cómo crear una transmisión que admite escritura
La interfaz WritableStream
de la API de Streams proporciona una abstracción estándar para escribir datos de transmisión en un destino, conocido como un receptor. Este objeto incluye contrapresión y puesta en cola integradas. Para crear una transmisión con capacidad de escritura, llama a su constructor WritableStream()
.
Tiene un parámetro underlyingSink
opcional, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de transmisión construida.
underlyingSink
underlyingSink
puede incluir los siguientes métodos opcionales definidos por el desarrollador. El parámetro controller
que se pasa a algunos de los métodos es un WritableStreamDefaultController
.
start(controller)
: Se llama a este método de inmediato cuando se construye el objeto. El contenido de este método debe apuntar a obtener acceso al receptor subyacente. Si este proceso se realiza de forma asíncrona, puede mostrar una promesa que indica el éxito o el fracaso.write(chunk, controller)
: Se llamará a este método cuando un fragmento nuevo de datos (especificado en el parámetrochunk
) esté listo para escribirse en el receptor subyacente. Puede mostrar una promesa para indicar el éxito o la falla de la operación de escritura. Se llamará a este método solo después de que se hayan realizado correctamente las operaciones de escritura anteriores y nunca después de que se cierre o anule la transmisión.close(controller)
: Se llamará a este método si la app indica que terminó de escribir fragmentos en la transmisión. El contenido debe hacer lo que sea necesario para finalizar las operaciones de escritura en el receptor subyacente y liberar acceso a él. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. Solo se llamará a este método después de que todas las escrituras en cola se realicen correctamente.abort(reason)
: Se llamará a este método si la app indica que desea cerrar la transmisión de manera repentina y ponerla en un estado de error. Puede limpiar los recursos retenidos, comoclose()
, pero se llamará aabort()
incluso si las operaciones de escritura están en cola. Esos fragmentos se desecharán. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. El parámetroreason
contiene unDOMString
que describe por qué se anuló la transmisión.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
La interfaz WritableStreamDefaultController
de la API de Streams representa un controlador que permite controlar el estado de un WritableStream
durante la configuración, a medida que se envían más fragmentos para escribir o al final de la escritura. Cuando se construye un WritableStream
, el receptor subyacente recibe una instancia de WritableStreamDefaultController
correspondiente para manipular. WritableStreamDefaultController
tiene un solo método, WritableStreamDefaultController.error()
, que genera un error en cualquier interacción futura con la transmisión asociada.
WritableStreamDefaultController
también admite una propiedad signal
que muestra una instancia de AbortSignal
, lo que permite detener una operación WritableStream
si es necesario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
El segundo argumento del constructor WritableStream()
, también opcional, es queuingStrategy
.
Es un objeto que define de forma opcional una estrategia de fila para la transmisión, que toma dos parámetros:
highWaterMark
: Es un número no negativo que indica la marca de agua más alta del flujo con esta estrategia de fila.size(chunk)
: Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento dado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedadWritableStreamDefaultWriter.desiredSize
adecuada.
Los métodos getWriter()
y write()
Para escribir en una transmisión que admite escritura, necesitas un escritor, que será un WritableStreamDefaultWriter
. El método getWriter()
de la interfaz WritableStream
muestra una instancia nueva de WritableStreamDefaultWriter
y bloquea la transmisión a esa instancia. Mientras la transmisión está bloqueada, no se podrá adquirir otro escritor hasta que se lance el actual.
El método write()
de la interfaz WritableStreamDefaultWriter
escribe un fragmento de datos pasado a un WritableStream
y a su receptor subyacente y, luego, muestra una promesa que se resuelve para indicar el éxito o el fracaso de la operación de escritura. Ten en cuenta que lo que significa “éxito” depende del receptor subyacente; podría indicar que se aceptó el fragmento y no necesariamente que se guardó de forma segura en su destino final.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
La propiedad locked
Para verificar si una transmisión con capacidad de escritura está bloqueada, accede a su propiedad WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Muestra de código de transmisión que admite escritura
En la siguiente muestra de código, se muestran todos los pasos en acción.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Canalizar una transmisión legible a una transmisión con escritura
Se puede canalizar una transmisión legible a través del método pipeTo()
de la transmisión legible.
ReadableStream.pipeTo()
canaliza el ReadableStream
actual a un WritableStream
determinado y muestra una
promesa que se cumple cuando el proceso de canalización se completa correctamente o se rechaza si se encuentran
errores.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Crea una transmisión de transformación
La interfaz TransformStream
de la API de Streams representa un conjunto de datos transformables. Para crear una transmisión de transformación, llama a su constructor TransformStream()
, que crea y muestra un objeto de flujo de transformación de los controladores determinados. El constructor TransformStream()
acepta como primer argumento un objeto de JavaScript opcional que representa el transformer
. Esos objetos pueden contener cualquiera de los siguientes métodos:
transformer
start(controller)
: Se llama a este método de inmediato cuando se construye el objeto. Por lo general, se usa para poner fragmentos de prefijos en cola mediantecontroller.enqueue()
. Esos fragmentos se leerán desde el lado legible, pero no dependerán de ninguna escritura en el lado que admite escritura. Si este proceso inicial es asíncrono, por ejemplo, porque se requiere cierto esfuerzo para adquirir los fragmentos de prefijo, la función puede mostrar una promesa que indique el éxito o el fracaso; una promesa rechazada generará un error en la transmisión. El constructorTransformStream()
volverá a mostrar las excepciones arrojadas.transform(chunk, controller)
: Se llama a este método cuando un fragmento nuevo escrito originalmente en el lado con capacidad de escritura está listo para transformarse. La implementación de transmisión garantiza que se llamará a esta función solo después de que se hayan realizado correctamente las transformaciones anteriores y nunca antes de que se completestart()
o después de que se haya llamado aflush()
. Esta función realiza el trabajo de transformación real del flujo de transformación. Puede poner los resultados en cola concontroller.enqueue()
. Esto permite que un solo fragmento escrito en el lado que admite escritura da como resultado cero o varios fragmentos en el lado legible, según la cantidad de veces que se llame acontroller.enqueue()
. Si el proceso de transformación es asíncrono, esta función puede mostrar una promesa que indica el éxito o el fracaso de la transformación. Una promesa rechazada generará un error en los lados de lectura y escritura del flujo de transformaciones. Si no se proporciona un métodotransform()
, se usa la transformación de identidad, que pone en cola los fragmentos sin cambios del lado que admite escritura al lado de lectura.flush(controller)
: Se llama a este método después de que todos los fragmentos escritos en el lado que admiten escritura se transformaron pasando correctamente portransform()
, y el lado que admite escritura está a punto de cerrarse. Por lo general, se usa para poner fragmentos de sufijo en cola en el lado legible, antes de que se cierre. Si el proceso de limpieza es asíncrono, la función puede mostrar una promesa para indicar el éxito o el fracaso; el resultado se comunicará al llamador destream.writable.write()
. Además, una promesa rechazada generará un error en los lados de lectura y escritura de la transmisión. Arrojar una excepción se trata de la misma manera que mostrar una promesa rechazada.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Las estrategias de cola writableStrategy
y readableStrategy
El segundo y tercer parámetros opcionales del constructor TransformStream()
son estrategias de cola opcionales writableStrategy
y readableStrategy
. Se definen como se describe en las secciones de transmisión legibles y que admiten escritura, respectivamente.
Muestra de código de flujo de transformación
En la siguiente muestra de código, se observa un flujo de transformaciones simple en acción.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Canalización de una transmisión legible a través de una transmisión de transformación
El método pipeThrough()
de la interfaz ReadableStream
proporciona una forma encadenable de canalizar la transmisión actual a través de una transmisión de transformación o cualquier otro par que admita escritura o lectura. Por lo general, la canalización de una transmisión la bloqueará mientras dure la canalización, lo que evita que otros lectores la bloqueen.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
En la siguiente muestra de código (un poco forzada) se indica cómo implementar una versión “gritando” de fetch()
que convierte el texto en mayúsculas consumiendo la promesa de respuesta que se muestra como una transmisión y mayúsculas en fragmento por fragmento. La ventaja de este enfoque es que no necesitas esperar a que se descargue todo el documento, lo que puede suponer una gran diferencia cuando trabajas con archivos grandes.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demostración
En la siguiente demostración, se muestran transmisiones legibles, de escritura y de transformación en acción. También se incluyen ejemplos de cadenas de canalización pipeThrough()
y pipeTo()
, y se muestra tee()
. De manera opcional, puedes ejecutar la demostración en su propia ventana o ver el código fuente.
Transmisiones útiles disponibles en el navegador
Hay una serie de transmisiones útiles integradas en el navegador. Puedes crear un ReadableStream
con facilidad a partir de un BLOB. El método stream() de la interfaz Blob
muestra un ReadableStream
que, al momento de la lectura, muestra los datos contenidos en el BLOB. Recuerda también que un objeto File
es un tipo específico de Blob
y se puede usar en cualquier contexto que pueda usar un BLOB.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Las variantes de transmisión de TextDecoder.decode()
y TextEncoder.encode()
se denominan TextDecoderStream
y TextEncoderStream
, respectivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Comprimir o descomprimir un archivo es fácil con las transmisiones de transformaciones CompressionStream
y DecompressionStream
, respectivamente. En la siguiente muestra de código, se indica cómo descargar la especificación de flujos, comprimirla (gzip) directamente en el navegador y escribir el archivo comprimido directamente en el disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Las FileSystemWritableFileStream
de la API de File System Access y las transmisiones de solicitudes fetch()
experimentales son ejemplos de transmisiones en las que se pueden escribir.
La API de Serial hace un uso intensivo de las transmisiones legibles y con capacidad de escritura.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Por último, la API de WebSocketStream
integra transmisiones con la API de WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Recursos útiles
- Especificación de las transmisiones
- Demostraciones complementarias
- Transmite polyfill
- 2016: el año de los flujos web
- Iteradores y generadores asíncronos
- Visualizador de transmisiones
Agradecimientos
Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley y Adam Rice revisaron este artículo. Las entradas de blog de Jake Archibald me ayudaron mucho a comprender los flujos. Algunas de las muestras de código están inspiradas en las exploraciones del usuario de GitHub @bellbind y las partes de la prosa se compilan en gran medida en los documentos web de MDN en transmisiones. Los autores de Streams Standard hicieron un gran trabajo al escribir esta especificación. Hero image de Ryan Lara en Unsplash.