Transmisiones: la guía definitiva

Aprende a usar flujos de lectura, escritura y transformación con la API de Streams.

La API de Streams te permite acceder de forma programática a flujos de datos recibidos a través de la red o creados por cualquier medio de forma local y procesarlos con JavaScript. La transmisión implica desglosar un recurso que deseas recibir, enviar o transformar en fragmentos pequeños y, luego, procesarlos bit a bit. Si bien la transmisión es algo que los navegadores hacen de todos modos cuando reciben recursos como HTML o videos para mostrar en páginas web, esta función nunca estuvo disponible para JavaScript antes de que se presentara fetch con transmisiones en 2015.

Anteriormente, si querías procesar algún tipo de recurso (ya sea un video, un archivo de texto, etc.), debías descargar todo el archivo, esperar a que se deserializara en un formato adecuado y, luego, procesarlo. Con las transmisiones disponibles para JavaScript, todo cambia. Ahora puedes procesar datos sin procesar con JavaScript de forma progresiva apenas estén disponibles en el cliente, sin necesidad de generar un búfer, una cadena ni un objeto blob. Esto permite varios casos de uso, algunos de los cuales menciono a continuación:

  • Efectos de video: Envía una transmisión de video legible a través de una transmisión de transformación que aplica efectos en tiempo real.
  • (Des)compresión de datos: Envía una transmisión de archivos a través de una transmisión de transformación que la (des)comprime de forma selectiva.
  • Decodificación de imágenes: Envía un flujo de respuesta HTTP a través de un flujo de transformación que decodifica bytes en datos de mapa de bits y, luego, a través de otro flujo de transformación que traduce los mapas de bits a PNG. Si se instala dentro del controlador fetch de un trabajador de servicio, te permite realizar una polyfillación transparente de nuevos formatos de imagen, como AVIF.

Navegadores compatibles

ReadableStream y WritableStream

Navegadores compatibles

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Origen

TransformStream

Navegadores compatibles

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Origen

Conceptos básicos

Antes de explicar los diferentes tipos de transmisiones, permíteme presentarte algunos conceptos básicos.

Fragmentos

Un fragmento es un dato único que se escribe o se lee en un flujo. Puede ser de cualquier tipo. Los flujos incluso pueden contener fragmentos de diferentes tipos. La mayoría de las veces, un fragmento no será la unidad de datos más atómica para un flujo determinado. Por ejemplo, un flujo de bytes puede contener fragmentos de 16 unidades de Uint8Array de KiB, en lugar de bytes individuales.

Transmisiones legibles

Un flujo legible representa una fuente de datos de la que puedes leer. En otras palabras, los datos salen de un flujo legible. Específicamente, un flujo legible es una instancia de la clase ReadableStream.

Transmisiones con capacidad de escritura

Un flujo de escritura representa un destino para los datos en los que puedes escribir. En otras palabras, los datos entran en un flujo de escritura. Específicamente, un flujo de escritura es una instancia de la clase WritableStream.

Transforma transmisiones

Una transmisión de transformación consta de un par de transmisiones: una transmisión de escritura, conocida como su lado de escritura, y una transmisión de lectura, conocida como su lado de lectura. Una metáfora del mundo real para esto sería un intérprete simultáneo que traduce de un idioma a otro sobre la marcha. De manera específica para el flujo de transformación, escribir en el lado de escritura hace que los datos nuevos estén disponibles para leer desde el lado de lectura. Específicamente, cualquier objeto con una propiedad writable y una propiedad readable puede servir como flujo de transformación. Sin embargo, la clase TransformStream estándar facilita la creación de un par de este tipo que esté correctamente entrelazado.

Cadenas para tuberías

Los flujos se usan principalmente encadenando unos con otros. Un flujo legible se puede canalizar directamente a un flujo escribible con el método pipeTo() del flujo legible o puede canalizarse a través de uno o más flujos de transformación primero con el método pipeThrough() del flujo legible. Un conjunto de flujos unidos de esta manera se conoce como cadena de canalizaciones.

Contrapresión

Una vez que se construye una cadena de canalizaciones, propagará indicadores sobre la rapidez con la que los fragmentos deben fluir a través de ella. Si algún paso de la cadena aún no puede aceptar fragmentos, propaga una señal hacia atrás a través de la cadena de canalizaciones hasta que, finalmente, se le indique a la fuente original que deje de producir fragmentos tan rápido. Este proceso de normalización del flujo se denomina contrapresión.

Teeing

Un flujo legible se puede dividir (se le llama así por la forma de una "T" mayúscula) con su método tee(). Esto bloqueará el flujo, es decir, hará que ya no se pueda usar directamente. Sin embargo, creará dos flujos nuevos, llamados ramas, que se pueden consumir de forma independiente. El posicionamiento de la pelota en el tee también es importante porque las transmisiones no se pueden rebobinar ni reiniciar. Más información sobre esto más adelante.

Diagrama de una cadena de canalizaciones que consta de un flujo legible que proviene de una llamada a la API de recuperación que, luego, se canaliza a través de un flujo de transformación cuyo resultado se divide y, luego, se envía al navegador para el primer flujo legible resultante y a la caché del trabajador del servicio para el segundo flujo legible resultante.
Una cadena de tuberías.

Mecánica de un flujo legible

Un flujo legible es una fuente de datos representada en JavaScript por un objeto ReadableStream que fluye desde una fuente subyacente. El constructor ReadableStream() crea y muestra un objeto de flujo legible a partir de los controladores determinados. Existen dos tipos de fuente subyacente:

  • Las fuentes push te envían datos constantemente cuando accedes a ellas, y depende de ti iniciar, pausar o cancelar el acceso a la transmisión. Los ejemplos incluyen transmisiones de video en vivo, eventos enviados por el servidor o WebSockets.
  • Las fuentes de extracción requieren que solicites datos de ellas de forma explícita una vez que te conectes a ellas. Los ejemplos incluyen operaciones HTTP a través de llamadas fetch() o XMLHttpRequest.

Los datos de transmisión se leen de forma secuencial en partes pequeñas llamadas fragmentos. Se dice que los fragmentos que se colocan en un flujo están en cola. Esto significa que están esperando en una fila para que se lean. Una cola interna realiza un seguimiento de los fragmentos que aún no se leyeron.

Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como marca de agua.

Un lector lee los fragmentos dentro del flujo. Este lector recupera los datos por fragmento a la vez, lo que te permite realizar cualquier tipo de operación que desees. El lector y el otro código de procesamiento que lo acompaña se denominan consumidor.

La siguiente construcción en este contexto se denomina controlador. Cada flujo legible tiene un controlador asociado que, como su nombre lo indica, te permite controlarlo.

Solo un lector puede leer un flujo a la vez. Cuando se crea un lector y comienza a leer un flujo (es decir, se convierte en un lector activo), se bloquea. Si quieres que otro lector se haga cargo de la lectura de tu transmisión, por lo general, debes liberar el primer lector antes de hacer cualquier otra acción (aunque puedes dividir las transmisiones).

Cómo crear un flujo legible

Para crear un flujo legible, llama a su constructor ReadableStream(). El constructor tiene un argumento opcional underlyingSource, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de flujo creada.

underlyingSource

Para ello, se pueden usar los siguientes métodos opcionales definidos por el desarrollador:

  • start(controller): Se llama de inmediato cuando se construye el objeto. El método puede acceder a la fuente de la transmisión y hacer cualquier otra acción necesaria para configurar la funcionalidad de transmisión. Si este proceso se debe realizar de forma asíncrona, el método puede mostrar una promesa para indicar el éxito o el fracaso. El parámetro controller que se pasa a este método es un ReadableStreamDefaultController.
  • pull(controller): Se puede usar para controlar la transmisión a medida que se recuperan más fragmentos. Se llama de forma reiterada, siempre y cuando la cola interna de fragmentos del flujo no esté completa, hasta que la cola alcance su límite máximo. Si el resultado de llamar a pull() es una promesa, no se volverá a llamar a pull() hasta que se cumpla dicha promesa. Si la promesa se rechaza, la transmisión tendrá errores.
  • cancel(reason): Se lo llama cuando el consumidor de la transmisión la cancela.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController admite los siguientes métodos:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

El segundo argumento, también opcional, del constructor ReadableStream() es queuingStrategy. Es un objeto que, de manera opcional, define una estrategia de cola para la transmisión, que toma dos parámetros:

  • highWaterMark: Es un número no negativo que indica el límite superior de la transmisión con esta estrategia de cola.
  • size(chunk): Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento determinado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedad ReadableStreamDefaultController.desiredSize adecuada. También controla cuándo se llama al método pull() de la fuente subyacente.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Los métodos getReader() y read()

Para leer de un flujo legible, necesitas un lector, que será un ReadableStreamDefaultReader. El método getReader() de la interfaz ReadableStream crea un lector y bloquea la transmisión en él. Mientras el flujo esté bloqueado, no se podrá adquirir ningún otro lector hasta que se libere.

El método read() de la interfaz ReadableStreamDefaultReader muestra una promesa que proporciona acceso al siguiente fragmento de la cola interna del flujo. Se completa o rechaza con un resultado según el estado del flujo. Las diferentes posibilidades son las siguientes:

  • Si hay un fragmento disponible, la promesa se cumplirá con un objeto del formulario
    { value: chunk, done: false }.
  • Si la transmisión se cierra, la promesa se cumplirá con un objeto del formulario
    { value: undefined, done: true }.
  • Si la transmisión tiene errores, la promesa se rechazará con el error correspondiente.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

La propiedad locked

Para verificar si una transmisión legible está bloqueada, accede a su propiedad ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Muestras de código de flujo legibles

En el siguiente ejemplo de código, se muestran todos los pasos en acción. Primero, creas un ReadableStream que, en su argumento underlyingSource (es decir, la clase TimestampSource), define un método start(). Este método le indica al controller de la transmisión que enqueue() una marca de tiempo cada segundo durante diez segundos. Por último, le indica al controlador que close() la transmisión. Para consumir esta transmisión, crea un lector a través del método getReader() y llama a read() hasta que la transmisión sea done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteración asíncrona

Comprobar en cada iteración del bucle read() si la transmisión es done puede no ser la API más conveniente. Por suerte, pronto habrá una mejor manera de hacerlo: la iteración asíncrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una solución alternativa para usar la iteración asíncrona en la actualidad es implementar el comportamiento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Cómo crear un flujo legible

El método tee() de la interfaz ReadableStream deriva el flujo actual que se puede leer y muestra un array de dos elementos que contiene las dos ramas resultantes como instancias nuevas de ReadableStream. Esto permite que dos lectores lean un flujo de forma simultánea. Puedes hacer esto, por ejemplo, en un trabajador de servicio si quieres recuperar una respuesta del servidor y transmitirla al navegador, pero también transmitirla a la caché del trabajador de servicio. Como un cuerpo de respuesta no se puede consumir más de una vez, necesitas dos copias para hacerlo. Para cancelar la transmisión, debes cancelar ambas ramas resultantes. Por lo general, si se establece un flujo, este se bloqueará durante el tiempo que dure, lo que evitará que otros lectores lo bloqueen.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Transmisiones de bytes legibles

Para los flujos que representan bytes, se proporciona una versión extendida del flujo legible para controlar los bytes de manera eficiente, en particular, minimizando las copias. Los flujos de bytes permiten adquirir lectores de trae tu propio búfer (BYOB). La implementación predeterminada puede proporcionar un rango de resultados diferentes, como cadenas o búferes de array en el caso de WebSockets, mientras que los flujos de bytes garantizan la salida de bytes. Además, los lectores de BYOB tienen beneficios de estabilidad. Esto se debe a que, si se desconecta un búfer, se puede garantizar que no se escriba en el mismo búfer dos veces, lo que evita las condiciones de carrera. Los lectores de BYOB pueden reducir la cantidad de veces que el navegador debe ejecutar la recolección de elementos no utilizados, ya que puede reutilizar los búferes.

Cómo crear un flujo de bytes legible

Para crear un flujo de bytes legible, pasa un parámetro type adicional al constructor ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

La fuente subyacente de un flujo de bytes legible recibe un ReadableByteStreamController para manipular. Su método ReadableByteStreamController.enqueue() toma un argumento chunk cuyo valor es un ArrayBufferView. La propiedad ReadableByteStreamController.byobRequest muestra la solicitud de extracción de BYOB actual o nulo si no hay ninguna. Por último, la propiedad ReadableByteStreamController.desiredSize muestra el tamaño deseado para completar la cola interna de la transmisión controlada.

queuingStrategy

El segundo argumento, también opcional, del constructor ReadableStream() es queuingStrategy. Es un objeto que, de manera opcional, define una estrategia de cola para la transmisión, que toma un parámetro:

  • highWaterMark: Es una cantidad no negativa de bytes que indica el límite superior de la transmisión con esta estrategia de cola. Esto se usa para determinar la contrapresión, que se manifiesta a través de la propiedad ReadableByteStreamController.desiredSize adecuada. También controla cuándo se llama al método pull() de la fuente subyacente.

Los métodos getReader() y read()

Luego, puedes obtener acceso a un ReadableStreamBYOBReader configurando el parámetro mode de la siguiente manera: ReadableStream.getReader({ mode: "byob" }). Esto permite un control más preciso sobre la asignación de búferes para evitar copias. Para leer desde el flujo de bytes, debes llamar a ReadableStreamBYOBReader.read(view), donde view es un ArrayBufferView.

Muestra de código de flujo de bytes legible

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La siguiente función muestra flujos de bytes legibles que permiten una lectura eficiente sin copia de un arreglo generado de forma aleatoria. En lugar de usar un tamaño de fragmento predeterminado de 1,024, intenta completar el búfer proporcionado por el desarrollador, lo que permite un control total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

La mecánica de un flujo de escritura

Un flujo de escritura es un destino en el que puedes escribir datos, representados en JavaScript por un objeto WritableStream. Esto sirve como abstracción sobre un sumidero subyacente, un sumidero de E/S de nivel inferior en el que se escriben los datos sin procesar.

Los datos se escriben en el flujo a través de un escritor, un fragmento a la vez. Un fragmento puede adoptar una gran variedad de formas, al igual que los fragmentos de un lector. Puedes usar el código que quieras para producir los fragmentos listos para escribir. El escritor más el código asociado se denomina productor.

Cuando se crea un escritor y comienza a escribir en un flujo (un escritor activo), se dice que está bloqueado. Solo un escritor puede escribir en un flujo de escritura a la vez. Si quieres que otro escritor comience a escribir en tu flujo, por lo general, debes liberarlo antes de conectar otro escritor.

Una cola interna realiza un seguimiento de los fragmentos que se escribieron en el flujo, pero que el sink subyacente aún no procesó.

Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como marca de agua.

La construcción final se denomina controlador. Cada flujo de escritura tiene un controlador asociado que te permite controlarlo (por ejemplo, para abortarlo).

Cómo crear un flujo de escritura

La interfaz WritableStream de la API de Streams proporciona una abstracción estándar para escribir datos de transmisión en un destino, conocido como receptor. Este objeto incluye una presión de retorno y una cola integradas. Para crear un flujo de escritura, llama a su constructor WritableStream(). Tiene un parámetro underlyingSink opcional, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de flujo creada.

underlyingSink

underlyingSink puede incluir los siguientes métodos opcionales definidos por el desarrollador. El parámetro controller que se pasa a algunos de los métodos es un WritableStreamDefaultController.

  • start(controller): Se llama a este método inmediatamente cuando se construye el objeto. El contenido de este método debe tener como objetivo obtener acceso al sumidero subyacente. Si este proceso se debe realizar de forma asíncrona, puede mostrar una promesa para indicar el éxito o el fracaso.
  • write(chunk, controller): Se llamará a este método cuando un nuevo fragmento de datos (especificado en el parámetro chunk) esté listo para escribirse en el sumidero subyacente. Puede mostrar una promesa para indicar el éxito o el fracaso de la operación de escritura. Se llamará a este método solo después de que las operaciones de escritura anteriores se hayan realizado correctamente y nunca después de que se cierre o se cancele la transmisión.
  • close(controller): Se llamará a este método si la app indica que terminó de escribir fragmentos en la transmisión. El contenido debe hacer lo que sea necesario para finalizar las operaciones de escritura en el sink subyacente y liberar el acceso a él. Si este proceso es asíncrono, puede mostrar una promesa para indicar si se realizó correctamente o no. Se llamará a este método solo después de que se hayan realizado correctamente todas las operaciones de escritura en fila.
  • abort(reason): Se llamará a este método si la app indica que desea cerrar abruptamente la transmisión y colocarla en un estado con errores. Puede limpiar cualquier recurso retenido, al igual que close(), pero se llamará a abort() incluso si las operaciones de escritura están en cola. Esos fragmentos se descartarán. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. El parámetro reason contiene un DOMString que describe por qué se canceló la transmisión.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

La interfaz WritableStreamDefaultController de la API de Streams representa un controlador que permite controlar el estado de un WritableStream durante la configuración, a medida que se envían más fragmentos para escribir o al final de la escritura. Cuando se construye un WritableStream, al sumidero subyacente se le asigna una instancia WritableStreamDefaultController correspondiente para manipular. WritableStreamDefaultController solo tiene un método: WritableStreamDefaultController.error(), que hace que cualquier interacción futura con la transmisión asociada genere un error. WritableStreamDefaultController también admite una propiedad signal que muestra una instancia de AbortSignal, lo que permite detener una operación WritableStream si es necesario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

El segundo argumento, también opcional, del constructor WritableStream() es queuingStrategy. Es un objeto que, de manera opcional, define una estrategia de cola para la transmisión, que toma dos parámetros:

  • highWaterMark: Es un número no negativo que indica el límite superior de la transmisión con esta estrategia de cola.
  • size(chunk): Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento determinado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedad WritableStreamDefaultWriter.desiredSize adecuada.

Los métodos getWriter() y write()

Para escribir en un flujo de escritura, necesitas un escritor, que será un WritableStreamDefaultWriter. El método getWriter() de la interfaz WritableStream muestra una instancia nueva de WritableStreamDefaultWriter y bloquea el flujo en esa instancia. Mientras el flujo está bloqueado, no se puede adquirir ningún otro escritor hasta que se libere el actual.

El método write() de la interfaz WritableStreamDefaultWriter escribe un fragmento de datos pasado a un WritableStream y su sink subyacente y, luego, muestra una promesa que se resuelve para indicar el éxito o el fracaso de la operación de escritura. Ten en cuenta que el significado de "correcto" depende del sink subyacente. Puede indicar que se aceptó el fragmento, pero no necesariamente que se guardó de forma segura en su destino final.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

La propiedad locked

Para verificar si un flujo de escritura está bloqueado, accede a su propiedad WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Ejemplo de código de transmisión que se puede escribir

En el siguiente ejemplo de código, se muestran todos los pasos en acción.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Cómo canalizar una transmisión legible a una transmisión de escritura

Un flujo legible se puede canalizar a un flujo de escritura a través del método pipeTo() del flujo legible. ReadableStream.pipeTo() canaliza la ReadableStream actual a una WritableStream determinada y muestra una promesa que se cumple cuando el proceso de canalización se completa correctamente o se rechaza si se encontraron errores.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Cómo crear una transmisión de transformación

La interfaz TransformStream de la API de Streams representa un conjunto de datos transformables. Para crear un flujo de transformación, llama a su constructor TransformStream(), que crea y muestra un objeto de flujo de transformación a partir de los controladores determinados. El constructor TransformStream() acepta como primer argumento un objeto JavaScript opcional que representa el transformer. Estos objetos pueden contener cualquiera de los siguientes métodos:

transformer

  • start(controller): Se llama a este método inmediatamente cuando se construye el objeto. Por lo general, se usa para poner en cola fragmentos de prefijos con controller.enqueue(). Esos fragmentos se leerán desde el lado de lectura, pero no dependen de ninguna escritura en el lado de escritura. Si este proceso inicial es asíncrono, por ejemplo, porque requiere cierto esfuerzo adquirir los fragmentos de prefijo, la función puede mostrar una promesa para indicar el éxito o el fracaso. Una promesa rechazada generará un error en la transmisión. El constructor TransformStream() volverá a arrojar cualquier excepción.
  • transform(chunk, controller): Se llama a este método cuando un fragmento nuevo escrito originalmente en el lado de escritura está listo para transformarse. La implementación del flujo garantiza que se llamará a esta función solo después de que se hayan realizado correctamente las transformaciones anteriores y nunca antes de que se complete start() o después de que se llame a flush(). Esta función realiza el trabajo de transformación real del flujo de transformación. Puede poner en cola los resultados con controller.enqueue(). Esto permite que un solo fragmento escrito en el lado de escritura genere cero o varios fragmentos en el lado de lectura, según la cantidad de veces que se llame a controller.enqueue(). Si el proceso de transformación es asíncrono, esta función puede mostrar una promesa para indicar el éxito o el fracaso de la transformación. Una promesa rechazada generará errores en los lados legibles y escribibles del flujo de transformación. Si no se proporciona un método transform(), se usa la transformación de identidad, que pone en cola fragmentos sin cambios del lado de escritura al lado de lectura.
  • flush(controller): Se llama a este método después de que todos los fragmentos escritos en el lado de escritura se hayan transformado pasando correctamente por transform(), y el lado de escritura está a punto de cerrarse. Por lo general, se usa para poner en cola fragmentos de sufijos en el lado legible, antes de que también se cierre. Si el proceso de limpieza es asíncrono, la función puede mostrar una promesa para indicar si se realizó correctamente o no. El resultado se comunicará al llamador de stream.writable.write(). Además, una promesa rechazada generará errores en los lados legibles y escribibles del flujo. Lanzar una excepción se trata de la misma manera que mostrar una promesa rechazada.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Estrategias de colas de writableStrategy y readableStrategy

El segundo y tercer parámetro opcional del constructor TransformStream() son estrategias de cola opcionales writableStrategy y readableStrategy. Se definen como se describe en las secciones de flujo legible y escribible, respectivamente.

Ejemplo de código de transmisión de Transform

En el siguiente ejemplo de código, se muestra una transmisión de transformación simple en acción.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Cómo canalizar una transmisión legible a través de una transmisión de transformación

El método pipeThrough() de la interfaz ReadableStream proporciona una forma encadenable de canalizar la transmisión actual a través de una transmisión de transformación o cualquier otro par de escritura y lectura. Por lo general, la canalización de un flujo lo bloqueará durante el tiempo que dure la canalización, lo que evitará que otros lectores lo bloqueen.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

En la siguiente muestra de código (un poco artificial), se muestra cómo podrías implementar una versión "en mayúsculas" de fetch() que convierte todo el texto en mayúsculas consumiendo la promesa de respuesta que se muestra como un flujo y convirtiendo en mayúsculas cada fragmento. La ventaja de este enfoque es que no necesitas esperar a que se descargue todo el documento, lo que puede marcar una gran diferencia cuando se trata de archivos grandes.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demostración

En la siguiente demostración, se muestran flujos de lectura, escritura y transformación en acción. También incluye ejemplos de cadenas de tuberías pipeThrough() y pipeTo(), y también se muestra tee(). De manera opcional, puedes ejecutar la demo en su propia ventana o ver el código fuente.

Transmisiones útiles disponibles en el navegador

Hay varias transmisiones útiles integradas directamente en el navegador. Puedes crear fácilmente un ReadableStream a partir de un blob. El método stream() de la interfaz Blob muestra un ReadableStream que, cuando se lee, muestra los datos contenidos en el blob. Además, recuerda que un objeto File es un tipo específico de Blob y se puede usar en cualquier contexto en el que se pueda usar un blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Las variantes de transmisión de TextDecoder.decode() y TextEncoder.encode() se denominan TextDecoderStream y TextEncoderStream, respectivamente.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Comprimir o descomprimir un archivo es fácil con las transmisiones de transformación CompressionStream y DecompressionStream, respectivamente. En la siguiente muestra de código, se muestra cómo puedes descargar la especificación de Streams, comprimirla (gzip) directamente en el navegador y escribir el archivo comprimido directamente en el disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream de la API de File System Access y los flujos de solicitudes fetch() experimentales son ejemplos de flujos de escritura en uso.

La API de Serial hace un uso intensivo de flujos legibles y escribibles.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Por último, la API de WebSocketStream integra transmisiones con la API de WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Recursos útiles

Agradecimientos

Este artículo fue revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley y Adam Rice. Las entradas de blog de Jake Archibald me ayudaron mucho a comprender las transmisiones. Algunas de las muestras de código se inspiran en las exploraciones del usuario de GitHub @bellbind, y partes del texto se basan en gran medida en los Documentos web de MDN sobre flujos. Los autores del estándar de flujos hicieron un gran trabajo al escribir esta especificación. Imagen hero de Ryan Lara en Unsplash.