Aprende a usar flujos de lectura, escritura y transformación con la API de Streams.
La API de Streams te permite acceder de forma programática a flujos de datos recibidos a través de la red o creados por cualquier medio de forma local y procesarlos con JavaScript. La transmisión implica desglosar un recurso que deseas recibir, enviar o transformar en fragmentos pequeños y, luego, procesarlos bit a bit. Si bien la transmisión es algo que los navegadores hacen de todos modos cuando reciben recursos como HTML o videos para mostrar en páginas web, esta función nunca estuvo disponible para JavaScript antes de que se presentara fetch
con transmisiones en 2015.
Anteriormente, si querías procesar algún tipo de recurso (ya sea un video, un archivo de texto, etc.), debías descargar todo el archivo, esperar a que se deserializara en un formato adecuado y, luego, procesarlo. Con las transmisiones disponibles para JavaScript, todo cambia. Ahora puedes procesar datos sin procesar con JavaScript de forma progresiva apenas estén disponibles en el cliente, sin necesidad de generar un búfer, una cadena ni un objeto blob. Esto permite varios casos de uso, algunos de los cuales menciono a continuación:
- Efectos de video: Envía una transmisión de video legible a través de una transmisión de transformación que aplica efectos en tiempo real.
- (Des)compresión de datos: Envía una transmisión de archivos a través de una transmisión de transformación que la (des)comprime de forma selectiva.
- Decodificación de imágenes: Envía un flujo de respuesta HTTP a través de un flujo de transformación que decodifica bytes en datos de mapa de bits y, luego, a través de otro flujo de transformación que traduce los mapas de bits a PNG. Si se instala dentro del controlador
fetch
de un trabajador de servicio, te permite realizar una polyfillación transparente de nuevos formatos de imagen, como AVIF.
Navegadores compatibles
ReadableStream y WritableStream
TransformStream
Conceptos básicos
Antes de explicar los diferentes tipos de transmisiones, permíteme presentarte algunos conceptos básicos.
Fragmentos
Un fragmento es un dato único que se escribe o se lee en un flujo. Puede ser de cualquier tipo. Los flujos incluso pueden contener fragmentos de diferentes tipos. La mayoría de las veces, un fragmento no será la unidad de datos más atómica para un flujo determinado. Por ejemplo, un flujo de bytes puede contener fragmentos de 16 unidades de Uint8Array
de KiB, en lugar de bytes individuales.
Transmisiones legibles
Un flujo legible representa una fuente de datos de la que puedes leer. En otras palabras, los datos salen de un flujo legible. Específicamente, un flujo legible es una instancia de la clase ReadableStream
.
Transmisiones con capacidad de escritura
Un flujo de escritura representa un destino para los datos en los que puedes escribir. En otras palabras, los datos
entran en un flujo de escritura. Específicamente, un flujo de escritura es una instancia de la clase WritableStream
.
Transforma transmisiones
Una transmisión de transformación consta de un par de transmisiones: una transmisión de escritura, conocida como su lado de escritura, y una transmisión de lectura, conocida como su lado de lectura.
Una metáfora del mundo real para esto sería un intérprete simultáneo que traduce de un idioma a otro sobre la marcha.
De manera específica para el flujo de transformación, escribir
en el lado de escritura hace que los datos nuevos estén disponibles para leer desde el
lado de lectura. Específicamente, cualquier objeto con una propiedad writable
y una propiedad readable
puede servir como flujo de transformación. Sin embargo, la clase TransformStream
estándar facilita la creación de un par de este tipo que esté correctamente entrelazado.
Cadenas para tuberías
Los flujos se usan principalmente encadenando unos con otros. Un flujo legible se puede canalizar directamente a un flujo escribible con el método pipeTo()
del flujo legible o puede canalizarse a través de uno o más flujos de transformación primero con el método pipeThrough()
del flujo legible. Un conjunto de flujos unidos de esta manera se conoce como cadena de canalizaciones.
Contrapresión
Una vez que se construye una cadena de canalizaciones, propagará indicadores sobre la rapidez con la que los fragmentos deben fluir a través de ella. Si algún paso de la cadena aún no puede aceptar fragmentos, propaga una señal hacia atrás a través de la cadena de canalizaciones hasta que, finalmente, se le indique a la fuente original que deje de producir fragmentos tan rápido. Este proceso de normalización del flujo se denomina contrapresión.
Teeing
Un flujo legible se puede dividir (se le llama así por la forma de una "T" mayúscula) con su método tee()
.
Esto bloqueará el flujo, es decir, hará que ya no se pueda usar directamente. Sin embargo, creará dos flujos nuevos, llamados ramas, que se pueden consumir de forma independiente.
El posicionamiento de la pelota en el tee también es importante porque las transmisiones no se pueden rebobinar ni reiniciar. Más información sobre esto más adelante.
Mecánica de un flujo legible
Un flujo legible es una fuente de datos representada en JavaScript por un objeto ReadableStream
que fluye desde una fuente subyacente. El constructor ReadableStream()
crea y muestra un objeto de flujo legible a partir de los controladores determinados. Existen dos tipos de fuente subyacente:
- Las fuentes push te envían datos constantemente cuando accedes a ellas, y depende de ti iniciar, pausar o cancelar el acceso a la transmisión. Los ejemplos incluyen transmisiones de video en vivo, eventos enviados por el servidor o WebSockets.
- Las fuentes de extracción requieren que solicites datos de ellas de forma explícita una vez que te conectes a ellas. Los ejemplos incluyen operaciones HTTP a través de llamadas
fetch()
oXMLHttpRequest
.
Los datos de transmisión se leen de forma secuencial en partes pequeñas llamadas fragmentos. Se dice que los fragmentos que se colocan en un flujo están en cola. Esto significa que están esperando en una fila para que se lean. Una cola interna realiza un seguimiento de los fragmentos que aún no se leyeron.
Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como marca de agua.
Un lector lee los fragmentos dentro del flujo. Este lector recupera los datos por fragmento a la vez, lo que te permite realizar cualquier tipo de operación que desees. El lector y el otro código de procesamiento que lo acompaña se denominan consumidor.
La siguiente construcción en este contexto se denomina controlador. Cada flujo legible tiene un controlador asociado que, como su nombre lo indica, te permite controlarlo.
Solo un lector puede leer un flujo a la vez. Cuando se crea un lector y comienza a leer un flujo (es decir, se convierte en un lector activo), se bloquea. Si quieres que otro lector se haga cargo de la lectura de tu transmisión, por lo general, debes liberar el primer lector antes de hacer cualquier otra acción (aunque puedes dividir las transmisiones).
Cómo crear un flujo legible
Para crear un flujo legible, llama a su constructor ReadableStream()
.
El constructor tiene un argumento opcional underlyingSource
, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de flujo creada.
underlyingSource
Para ello, se pueden usar los siguientes métodos opcionales definidos por el desarrollador:
start(controller)
: Se llama de inmediato cuando se construye el objeto. El método puede acceder a la fuente de la transmisión y hacer cualquier otra acción necesaria para configurar la funcionalidad de transmisión. Si este proceso se debe realizar de forma asíncrona, el método puede mostrar una promesa para indicar el éxito o el fracaso. El parámetrocontroller
que se pasa a este método es unReadableStreamDefaultController
.pull(controller)
: Se puede usar para controlar la transmisión a medida que se recuperan más fragmentos. Se llama de forma reiterada, siempre y cuando la cola interna de fragmentos del flujo no esté completa, hasta que la cola alcance su límite máximo. Si el resultado de llamar apull()
es una promesa, no se volverá a llamar apull()
hasta que se cumpla dicha promesa. Si la promesa se rechaza, la transmisión tendrá errores.cancel(reason)
: Se lo llama cuando el consumidor de la transmisión la cancela.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
admite los siguientes métodos:
ReadableStreamDefaultController.close()
cierra el flujo asociado.ReadableStreamDefaultController.enqueue()
pone en cola un fragmento determinado en la transmisión asociada.ReadableStreamDefaultController.error()
provoca que se produzcan errores en las interacciones futuras con la transmisión asociada.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
El segundo argumento, también opcional, del constructor ReadableStream()
es queuingStrategy
.
Es un objeto que, de manera opcional, define una estrategia de cola para la transmisión, que toma dos parámetros:
highWaterMark
: Es un número no negativo que indica el límite superior de la transmisión con esta estrategia de cola.size(chunk)
: Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento determinado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedadReadableStreamDefaultController.desiredSize
adecuada. También controla cuándo se llama al métodopull()
de la fuente subyacente.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Los métodos getReader()
y read()
Para leer de un flujo legible, necesitas un lector, que será un ReadableStreamDefaultReader
.
El método getReader()
de la interfaz ReadableStream
crea un lector y bloquea la transmisión en él. Mientras el flujo esté bloqueado, no se podrá adquirir ningún otro lector hasta que se libere.
El método read()
de la interfaz ReadableStreamDefaultReader
muestra una promesa que proporciona acceso al siguiente fragmento de la cola interna del flujo. Se completa o rechaza con un resultado según el estado del flujo. Las diferentes posibilidades son las siguientes:
- Si hay un fragmento disponible, la promesa se cumplirá con un objeto del formulario
{ value: chunk, done: false }
. - Si la transmisión se cierra, la promesa se cumplirá con un objeto del formulario
{ value: undefined, done: true }
. - Si la transmisión tiene errores, la promesa se rechazará con el error correspondiente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
La propiedad locked
Para verificar si una transmisión legible está bloqueada, accede a su propiedad ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Muestras de código de flujo legibles
En el siguiente ejemplo de código, se muestran todos los pasos en acción. Primero, creas un ReadableStream
que, en su argumento underlyingSource
(es decir, la clase TimestampSource
), define un método start()
.
Este método le indica al controller
de la transmisión que enqueue()
una marca de tiempo cada segundo durante diez segundos.
Por último, le indica al controlador que close()
la transmisión. Para consumir esta transmisión, crea un lector a través del método getReader()
y llama a read()
hasta que la transmisión sea done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteración asíncrona
Comprobar en cada iteración del bucle read()
si la transmisión es done
puede no ser la API más conveniente.
Por suerte, pronto habrá una mejor manera de hacerlo: la iteración asíncrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una solución alternativa para usar la iteración asíncrona en la actualidad es implementar el comportamiento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Cómo crear un flujo legible
El método tee()
de la interfaz ReadableStream
deriva el flujo actual que se puede leer y muestra un array de dos elementos que contiene las dos ramas resultantes como instancias nuevas de ReadableStream
. Esto permite que dos lectores lean un flujo de forma simultánea. Puedes hacer esto, por ejemplo, en un trabajador de servicio si quieres recuperar una respuesta del servidor y transmitirla al navegador, pero también transmitirla a la caché del trabajador de servicio. Como un cuerpo de respuesta no se puede consumir más de una vez, necesitas dos copias para hacerlo. Para cancelar la transmisión, debes cancelar ambas ramas resultantes. Por lo general, si se establece un flujo, este se bloqueará durante el tiempo que dure, lo que evitará que otros lectores lo bloqueen.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Transmisiones de bytes legibles
Para los flujos que representan bytes, se proporciona una versión extendida del flujo legible para controlar los bytes de manera eficiente, en particular, minimizando las copias. Los flujos de bytes permiten adquirir lectores de trae tu propio búfer (BYOB). La implementación predeterminada puede proporcionar un rango de resultados diferentes, como cadenas o búferes de array en el caso de WebSockets, mientras que los flujos de bytes garantizan la salida de bytes. Además, los lectores de BYOB tienen beneficios de estabilidad. Esto se debe a que, si se desconecta un búfer, se puede garantizar que no se escriba en el mismo búfer dos veces, lo que evita las condiciones de carrera. Los lectores de BYOB pueden reducir la cantidad de veces que el navegador debe ejecutar la recolección de elementos no utilizados, ya que puede reutilizar los búferes.
Cómo crear un flujo de bytes legible
Para crear un flujo de bytes legible, pasa un parámetro type
adicional al constructor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
La fuente subyacente de un flujo de bytes legible recibe un ReadableByteStreamController
para manipular. Su método ReadableByteStreamController.enqueue()
toma un argumento chunk
cuyo valor es un ArrayBufferView
. La propiedad ReadableByteStreamController.byobRequest
muestra la solicitud de extracción de BYOB actual o nulo si no hay ninguna. Por último, la propiedad ReadableByteStreamController.desiredSize
muestra el tamaño deseado para completar la cola interna de la transmisión controlada.
queuingStrategy
El segundo argumento, también opcional, del constructor ReadableStream()
es queuingStrategy
.
Es un objeto que, de manera opcional, define una estrategia de cola para la transmisión, que toma un
parámetro:
highWaterMark
: Es una cantidad no negativa de bytes que indica el límite superior de la transmisión con esta estrategia de cola. Esto se usa para determinar la contrapresión, que se manifiesta a través de la propiedadReadableByteStreamController.desiredSize
adecuada. También controla cuándo se llama al métodopull()
de la fuente subyacente.
Los métodos getReader()
y read()
Luego, puedes obtener acceso a un ReadableStreamBYOBReader
configurando el parámetro mode
de la siguiente manera: ReadableStream.getReader({ mode: "byob" })
. Esto permite un control más preciso sobre la asignación de búferes para evitar copias. Para leer desde el flujo de bytes, debes llamar a ReadableStreamBYOBReader.read(view)
, donde view
es un ArrayBufferView
.
Muestra de código de flujo de bytes legible
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La siguiente función muestra flujos de bytes legibles que permiten una lectura eficiente sin copia de un arreglo generado de forma aleatoria. En lugar de usar un tamaño de fragmento predeterminado de 1,024, intenta completar el búfer proporcionado por el desarrollador, lo que permite un control total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
La mecánica de un flujo de escritura
Un flujo de escritura es un destino en el que puedes escribir datos, representados en JavaScript por un objeto WritableStream
. Esto sirve como abstracción sobre un sumidero subyacente, un sumidero de E/S de nivel inferior en el que se escriben los datos sin procesar.
Los datos se escriben en el flujo a través de un escritor, un fragmento a la vez. Un fragmento puede adoptar una gran variedad de formas, al igual que los fragmentos de un lector. Puedes usar el código que quieras para producir los fragmentos listos para escribir. El escritor más el código asociado se denomina productor.
Cuando se crea un escritor y comienza a escribir en un flujo (un escritor activo), se dice que está bloqueado. Solo un escritor puede escribir en un flujo de escritura a la vez. Si quieres que otro escritor comience a escribir en tu flujo, por lo general, debes liberarlo antes de conectar otro escritor.
Una cola interna realiza un seguimiento de los fragmentos que se escribieron en el flujo, pero que el sink subyacente aún no procesó.
Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como marca de agua.
La construcción final se denomina controlador. Cada flujo de escritura tiene un controlador asociado que te permite controlarlo (por ejemplo, para abortarlo).
Cómo crear un flujo de escritura
La interfaz WritableStream
de la API de Streams proporciona una abstracción estándar para escribir datos de transmisión en un destino, conocido como receptor. Este objeto incluye una presión de retorno y una cola integradas. Para crear un flujo de escritura, llama a su constructor WritableStream()
.
Tiene un parámetro underlyingSink
opcional, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de flujo creada.
underlyingSink
underlyingSink
puede incluir los siguientes métodos opcionales definidos por el desarrollador. El parámetro controller
que se pasa a algunos de los métodos es un WritableStreamDefaultController
.
start(controller)
: Se llama a este método inmediatamente cuando se construye el objeto. El contenido de este método debe tener como objetivo obtener acceso al sumidero subyacente. Si este proceso se debe realizar de forma asíncrona, puede mostrar una promesa para indicar el éxito o el fracaso.write(chunk, controller)
: Se llamará a este método cuando un nuevo fragmento de datos (especificado en el parámetrochunk
) esté listo para escribirse en el sumidero subyacente. Puede mostrar una promesa para indicar el éxito o el fracaso de la operación de escritura. Se llamará a este método solo después de que las operaciones de escritura anteriores se hayan realizado correctamente y nunca después de que se cierre o se cancele la transmisión.close(controller)
: Se llamará a este método si la app indica que terminó de escribir fragmentos en la transmisión. El contenido debe hacer lo que sea necesario para finalizar las operaciones de escritura en el sink subyacente y liberar el acceso a él. Si este proceso es asíncrono, puede mostrar una promesa para indicar si se realizó correctamente o no. Se llamará a este método solo después de que se hayan realizado correctamente todas las operaciones de escritura en fila.abort(reason)
: Se llamará a este método si la app indica que desea cerrar abruptamente la transmisión y colocarla en un estado con errores. Puede limpiar cualquier recurso retenido, al igual queclose()
, pero se llamará aabort()
incluso si las operaciones de escritura están en cola. Esos fragmentos se descartarán. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. El parámetroreason
contiene unDOMString
que describe por qué se canceló la transmisión.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
La interfaz WritableStreamDefaultController
de la API de Streams representa un controlador que permite controlar el estado de un WritableStream
durante la configuración, a medida que se envían más fragmentos para escribir o al final de la escritura. Cuando se construye un WritableStream
, al sumidero subyacente se le asigna una instancia WritableStreamDefaultController
correspondiente para manipular. WritableStreamDefaultController
solo tiene un método: WritableStreamDefaultController.error()
, que hace que cualquier interacción futura con la transmisión asociada genere un error.
WritableStreamDefaultController
también admite una propiedad signal
que muestra una instancia de AbortSignal
, lo que permite detener una operación WritableStream
si es necesario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
El segundo argumento, también opcional, del constructor WritableStream()
es queuingStrategy
.
Es un objeto que, de manera opcional, define una estrategia de cola para la transmisión, que toma dos parámetros:
highWaterMark
: Es un número no negativo que indica el límite superior de la transmisión con esta estrategia de cola.size(chunk)
: Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento determinado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedadWritableStreamDefaultWriter.desiredSize
adecuada.
Los métodos getWriter()
y write()
Para escribir en un flujo de escritura, necesitas un escritor, que será un WritableStreamDefaultWriter
. El método getWriter()
de la interfaz WritableStream
muestra una instancia nueva de WritableStreamDefaultWriter
y bloquea el flujo en esa instancia. Mientras el flujo está bloqueado, no se puede adquirir ningún otro escritor hasta que se libere el actual.
El método write()
de la interfaz WritableStreamDefaultWriter
escribe un fragmento de datos pasado a un WritableStream
y su sink subyacente y, luego, muestra una promesa que se resuelve para indicar el éxito o el fracaso de la operación de escritura. Ten en cuenta que el significado de "correcto" depende del sink subyacente. Puede indicar que se aceptó el fragmento, pero no necesariamente que se guardó de forma segura en su destino final.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
La propiedad locked
Para verificar si un flujo de escritura está bloqueado, accede a su propiedad WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Ejemplo de código de transmisión que se puede escribir
En el siguiente ejemplo de código, se muestran todos los pasos en acción.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Cómo canalizar una transmisión legible a una transmisión de escritura
Un flujo legible se puede canalizar a un flujo de escritura a través del método pipeTo()
del flujo legible.
ReadableStream.pipeTo()
canaliza la ReadableStream
actual a una WritableStream
determinada y muestra una promesa que se cumple cuando el proceso de canalización se completa correctamente o se rechaza si se encontraron errores.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Cómo crear una transmisión de transformación
La interfaz TransformStream
de la API de Streams representa un conjunto de datos transformables. Para crear un flujo de transformación, llama a su constructor TransformStream()
, que crea y muestra un objeto de flujo de transformación a partir de los controladores determinados. El constructor TransformStream()
acepta como primer argumento un objeto JavaScript opcional que representa el transformer
. Estos objetos pueden contener cualquiera de los siguientes métodos:
transformer
start(controller)
: Se llama a este método inmediatamente cuando se construye el objeto. Por lo general, se usa para poner en cola fragmentos de prefijos concontroller.enqueue()
. Esos fragmentos se leerán desde el lado de lectura, pero no dependen de ninguna escritura en el lado de escritura. Si este proceso inicial es asíncrono, por ejemplo, porque requiere cierto esfuerzo adquirir los fragmentos de prefijo, la función puede mostrar una promesa para indicar el éxito o el fracaso. Una promesa rechazada generará un error en la transmisión. El constructorTransformStream()
volverá a arrojar cualquier excepción.transform(chunk, controller)
: Se llama a este método cuando un fragmento nuevo escrito originalmente en el lado de escritura está listo para transformarse. La implementación del flujo garantiza que se llamará a esta función solo después de que se hayan realizado correctamente las transformaciones anteriores y nunca antes de que se completestart()
o después de que se llame aflush()
. Esta función realiza el trabajo de transformación real del flujo de transformación. Puede poner en cola los resultados concontroller.enqueue()
. Esto permite que un solo fragmento escrito en el lado de escritura genere cero o varios fragmentos en el lado de lectura, según la cantidad de veces que se llame acontroller.enqueue()
. Si el proceso de transformación es asíncrono, esta función puede mostrar una promesa para indicar el éxito o el fracaso de la transformación. Una promesa rechazada generará errores en los lados legibles y escribibles del flujo de transformación. Si no se proporciona un métodotransform()
, se usa la transformación de identidad, que pone en cola fragmentos sin cambios del lado de escritura al lado de lectura.flush(controller)
: Se llama a este método después de que todos los fragmentos escritos en el lado de escritura se hayan transformado pasando correctamente portransform()
, y el lado de escritura está a punto de cerrarse. Por lo general, se usa para poner en cola fragmentos de sufijos en el lado legible, antes de que también se cierre. Si el proceso de limpieza es asíncrono, la función puede mostrar una promesa para indicar si se realizó correctamente o no. El resultado se comunicará al llamador destream.writable.write()
. Además, una promesa rechazada generará errores en los lados legibles y escribibles del flujo. Lanzar una excepción se trata de la misma manera que mostrar una promesa rechazada.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Estrategias de colas de writableStrategy
y readableStrategy
El segundo y tercer parámetro opcional del constructor TransformStream()
son estrategias de cola opcionales writableStrategy
y readableStrategy
. Se definen como se describe en las secciones de flujo legible y escribible, respectivamente.
Ejemplo de código de transmisión de Transform
En el siguiente ejemplo de código, se muestra una transmisión de transformación simple en acción.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Cómo canalizar una transmisión legible a través de una transmisión de transformación
El método pipeThrough()
de la interfaz ReadableStream
proporciona una forma encadenable de canalizar la transmisión actual a través de una transmisión de transformación o cualquier otro par de escritura y lectura. Por lo general, la canalización de un flujo lo bloqueará durante el tiempo que dure la canalización, lo que evitará que otros lectores lo bloqueen.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
En la siguiente muestra de código (un poco artificial), se muestra cómo podrías implementar una versión "en mayúsculas" de fetch()
que convierte todo el texto en mayúsculas consumiendo la promesa de respuesta que se muestra como un flujo y convirtiendo en mayúsculas cada fragmento. La ventaja de este enfoque es que no necesitas esperar a que se descargue todo el documento, lo que puede marcar una gran diferencia cuando se trata de archivos grandes.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demostración
En la siguiente demostración, se muestran flujos de lectura, escritura y transformación en acción. También incluye ejemplos de cadenas de tuberías pipeThrough()
y pipeTo()
, y también se muestra tee()
. De manera opcional, puedes ejecutar la demo en su propia ventana o ver el código fuente.
Transmisiones útiles disponibles en el navegador
Hay varias transmisiones útiles integradas directamente en el navegador. Puedes crear fácilmente un ReadableStream
a partir de un blob. El método stream() de la interfaz Blob
muestra un ReadableStream
que, cuando se lee, muestra los datos contenidos en el blob. Además, recuerda que un objeto File
es un tipo específico de Blob
y se puede usar en cualquier contexto en el que se pueda usar un blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Las variantes de transmisión de TextDecoder.decode()
y TextEncoder.encode()
se denominan TextDecoderStream
y TextEncoderStream
, respectivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Comprimir o descomprimir un archivo es fácil con las transmisiones de transformación CompressionStream
y DecompressionStream
, respectivamente. En la siguiente muestra de código, se muestra cómo puedes descargar la especificación de Streams, comprimirla (gzip) directamente en el navegador y escribir el archivo comprimido directamente en el disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
de la API de File System Access y los flujos de solicitudes fetch()
experimentales son ejemplos de flujos de escritura en uso.
La API de Serial hace un uso intensivo de flujos legibles y escribibles.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Por último, la API de WebSocketStream
integra transmisiones con la API de WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Recursos útiles
- Especificación de flujos
- Demostraciones complementarias
- Polyfill de transmisiones
- 2016: el año de los flujos web
- Generadores y iteradores asíncronos
- Visualizador de transmisiones
Agradecimientos
Este artículo fue revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley y Adam Rice. Las entradas de blog de Jake Archibald me ayudaron mucho a comprender las transmisiones. Algunas de las muestras de código se inspiran en las exploraciones del usuario de GitHub @bellbind, y partes del texto se basan en gran medida en los Documentos web de MDN sobre flujos. Los autores del estándar de flujos hicieron un gran trabajo al escribir esta especificación. Imagen hero de Ryan Lara en Unsplash.