Flux : guide définitif

Découvrez comment utiliser des flux lisibles, accessibles en écriture et de transformation avec l'API Streams.

L'API Streams vous permet d'accéder de manière programmatique aux flux de données reçus sur le réseau ou créés par n'importe quel moyen localement, puis de les traiter avec JavaScript. Le traitement par flux consiste à décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer en petits fragments, puis de traiter ces fragments petit à petit. Bien que le streaming soit le comportement des navigateurs lorsqu'ils reçoivent des éléments tels que du code HTML ou des vidéos à afficher sur des pages Web, cette fonctionnalité n'a jamais été disponible pour JavaScript avant le lancement de fetch avec flux en 2015.

Auparavant, si vous vouliez traiter une ressource d'une sorte (une vidéo ou un fichier texte, etc.), vous deviez télécharger l'intégralité du fichier, attendre qu'il soit désérialisé dans un format approprié, puis le traiter. Comme les flux sont disponibles pour JavaScript, tout cela change. Vous pouvez désormais traiter les données brutes avec JavaScript de manière progressive, dès qu'elles sont disponibles sur le client, sans avoir à générer de tampon, de chaîne ni d'objet blob. Cela permet de débloquer un certain nombre de cas d'utilisation, dont certains sont listés ci-dessous:

  • Effets vidéo:redirection d'un flux vidéo lisible via un flux de transformation qui applique des effets en temps réel.
  • (dé)compression de données:acheminement d'un flux de fichiers vers un flux de transformation qui le compresse de manière sélective.
  • Décodage d'image:redirection d'un flux de réponse HTTP via un flux de transformation qui décode des octets en données bitmap, puis via un autre flux de transformation qui convertit les bitmaps en fichiers PNG. Si elle est installée dans le gestionnaire fetch d'un service worker, elle vous permet d'émuler de manière transparente de nouveaux formats d'image, tels qu'AVIF.

Prise en charge des navigateurs

ReadableStream et WritableStream

Navigateurs pris en charge

  • 43
  • 14
  • 65
  • 10.1

Source

TransformStream

Navigateurs pris en charge

  • 67
  • 79
  • 102
  • 14.1

Source

Concepts fondamentaux

Avant d'entrer dans les détails sur les différents types de flux, je vais vous présenter quelques concepts fondamentaux.

Friandises

Un fragment est un élément de données unique qui est écrit ou lu dans un flux. Il peut s'agir de n'importe quel type. Les flux peuvent même contenir des fragments de différents types. La plupart du temps, un fragment n'est pas l'unité de données la plus atomique pour un flux donné. Par exemple, un flux d'octets peut contenir des fragments composés de 16 Kio Uint8Array, au lieu d'octets uniques.

Flux lisibles

Un flux lisible représente une source de données à partir de laquelle vous pouvez lire. En d'autres termes, les données sortent d'un flux lisible. Concrètement, un flux lisible est une instance de la classe ReadableStream.

Flux accessibles en écriture

Un flux accessible en écriture représente la destination des données dans lesquelles vous pouvez écrire. En d'autres termes, les données entrent dans un flux accessible en écriture. Concrètement, un flux accessible en écriture est une instance de la classe WritableStream.

Transformer des flux

Un flux de transformation se compose d'une paire de flux: un flux accessible en écriture, appelé côté accessible en écriture, et un flux lisible, appelé "côté lisible". Pour cela, une métaphore utilisée dans le monde réel serait un interprète simultané qui traduit d'une langue à une autre à la volée. D'une manière spécifique au flux de transformation, l'écriture sur le côté accessible en écriture met de nouvelles données à disposition pour la lecture depuis le côté lisible. Concrètement, tout objet avec une propriété writable et une propriété readable peut servir de flux de transformation. Cependant, la classe TransformStream standard facilite la création d'une paire correctement enchevêtrée.

Chaînes de tuyaux

Les flux sont principalement utilisés par redirigement les uns vers les autres. Un flux lisible peut être redirigé directement vers un flux accessible en écriture à l'aide de la méthode pipeTo() du flux lisible. Il peut également être redirigé vers un ou plusieurs flux de transformation en premier, à l'aide de la méthode pipeThrough() du flux lisible. Un ensemble de flux reliés entre eux est appelé "chaîne de tuyaux".

Contre-pression

Une fois qu'une chaîne de pipes est construite, elle propage des signaux concernant la vitesse de circulation des fragments. Si une étape de la chaîne ne peut pas encore accepter les fragments, elle propage un signal vers l'arrière via le pipeline, jusqu'à ce que la source d'origine soit invitée à arrêter de produire des fragments si rapidement. Ce processus de normalisation du flux est appelé contre-pression.

Teeing

Un flux lisible peut être nommé d'après la forme d'un "T" majuscule) à l'aide de la méthode tee(). Le flux est ainsi verrouillé, c'est-à-dire qu'il ne peut plus être utilisé directement. Toutefois, cela crée deux nouveaux flux, appelés "branches", qui peuvent être utilisés indépendamment. Le teeing est également important, car les flux ne peuvent pas être renvoyés en arrière ni redémarrés. Nous reviendrons sur ce point plus tard.

Schéma d'une chaîne de pipelines composé d'un flux lisible provenant d'un appel à l'API fetch, puis acheminé via un flux de transformation dont la sortie est liée, puis envoyée au navigateur pour le premier flux lisible résultant et au cache du service worker pour le deuxième flux lisible résultant.
Chaîne de pipelines.

Mécanisme d'un flux lisible

Un flux lisible est une source de données représentée en JavaScript par un objet ReadableStream qui circule à partir d'une source sous-jacente. Le constructeur ReadableStream() crée et renvoie un objet de flux lisible à partir des gestionnaires donnés. Il existe deux types de sources sous-jacentes:

  • Les sources push envoient en permanence les données vers vous lorsque vous y avez accédé. C'est à vous de démarrer, de suspendre ou d'annuler l'accès au flux. Exemples : flux vidéo en direct, événements envoyés par le serveur ou WebSockets.
  • Les sources d'extraction nécessitent que vous leur demandiez explicitement des données une fois la connexion établie. Les opérations HTTP via des appels fetch() ou XMLHttpRequest en sont des exemples.

Les données de flux sont lues de manière séquentielle, en petits morceaux appelés segments. Les fragments placés dans un flux sont dits en file d'attente. Cela signifie qu'ils attendent dans une file d'attente prêt à être lu. Une file d'attente interne assure le suivi des fragments qui n'ont pas encore été lus.

Une stratégie de mise en file d'attente est un objet qui détermine la manière dont un flux doit signaler une contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare la taille totale de tous les fragments de la file d'attente à un nombre spécifié, appelé point d'eau élevé.

Les fragments à l'intérieur du flux sont lus par un lecteur. Ce lecteur récupère les données un fragment à la fois, ce qui vous permet d'effectuer le type d'opération de votre choix. Le lecteur et l'autre code de traitement qui l'accompagne sont appelés consommateurs.

Dans ce contexte, la construction suivante est appelée contrôleur. Chaque flux lisible est associé à un contrôleur qui, comme son nom l'indique, vous permet de contrôler le flux.

Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (c'est-à-dire qu'il devient un lecteur actif), il est verrouillé. Si vous souhaitez qu'un autre lecteur prenne en charge la lecture de votre flux, vous devez généralement libérer le premier lecteur avant de faire autre chose (vous pouvez toutefois accéder au départ des diffusions).

Créer un flux lisible

Vous créez un flux lisible en appelant son constructeur ReadableStream(). Le constructeur comporte un argument facultatif underlyingSource, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.

underlyingSource

Pour ce faire, vous pouvez utiliser les méthodes facultatives suivantes définies par le développeur:

  • start(controller): appelé immédiatement lors de la construction de l'objet. Cette méthode peut accéder à la source du flux et effectuer toute autre opération requise pour configurer la fonctionnalité de flux. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètre controller transmis à cette méthode est un ReadableStreamDefaultController.
  • pull(controller): permet de contrôler le flux à mesure que d'autres fragments sont récupérés. Il est appelé à plusieurs reprises tant que la file d'attente interne de fragments du flux n'est pas pleine, jusqu'à ce que la file d'attente atteigne son point d'eau élevé. Si le résultat de l'appel de pull() est une promesse, pull() ne sera rappelé qu'une fois cette promesse remplie. Si la promesse est rejetée, le flux comporte une erreur.
  • cancel(reason): appelé lorsque l'utilisateur du flux annule le flux.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController est compatible avec les méthodes suivantes:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, en utilisant deux paramètres:

  • highWaterMark: nombre non négatif indiquant la ligne la plus haute du flux avec cette stratégie de mise en file d'attente.
  • size(chunk): fonction qui calcule et renvoie la taille non négative finie de la valeur de segment donnée. Le résultat est utilisé pour déterminer la contre-pression, qui se manifeste via la propriété ReadableStreamDefaultController.desiredSize appropriée. Il régit également le moment où la méthode pull() de la source sous-jacente est appelée.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Méthodes getReader() et read()

Pour lire un flux lisible, vous avez besoin d'un lecteur, qui sera un ReadableStreamDefaultReader. La méthode getReader() de l'interface ReadableStream crée un lecteur et verrouille le flux sur celui-ci. Lorsque le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.

La méthode read() de l'interface ReadableStreamDefaultReader renvoie une promesse donnant accès au fragment suivant de la file d'attente interne du flux. Elle traite ou rejette, avec un résultat qui dépend de l'état du flux. Voici les différentes possibilités:

  • Si un fragment est disponible, la promesse sera remplie avec un objet au format
    { value: chunk, done: false }.
  • Si le flux est fermé, la promesse sera remplie avec un objet au format
    { value: undefined, done: true }.
  • Si le flux comporte une erreur, la promesse est rejetée avec l'erreur appropriée.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Propriété locked

Pour vérifier si un flux lisible est verrouillé, accédez à sa propriété ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemples de code de flux lisibles

L'exemple de code ci-dessous montre toutes les étapes concrètes. Vous devez d'abord créer un objet ReadableStream qui définit une méthode start() dans son argument underlyingSource (c'est-à-dire la classe TimestampSource). Cette méthode indique au controller du flux de enqueue() un horodatage toutes les secondes pendant 10 secondes. Enfin, il indique au contrôleur de close() le flux. Pour utiliser ce flux, créez un lecteur via la méthode getReader() et appelez read() jusqu'à ce que le flux soit done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Itération asynchrone

Vérifier à chaque itération de boucle read() si le flux est done n'est peut-être pas l'API la plus pratique. Heureusement, il y aura bientôt un meilleur moyen d'y parvenir: l'itération asynchrone.

for await (const chunk of stream) {
  console.log(chunk);
}

Une solution de contournement pour utiliser l'itération asynchrone consiste aujourd'hui à implémenter le comportement avec un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Teeing d'un flux lisible

La méthode tee() de l'interface ReadableStream crée le flux lisible actuel et renvoie un tableau à deux éléments contenant les deux branches résultantes en tant que nouvelles instances ReadableStream. Cela permet à deux lecteurs de lire un flux simultanément. Cela peut se produire, par exemple, dans un service worker si vous souhaitez récupérer une réponse du serveur et la diffuser en streaming vers le navigateur, mais également vers le cache du service worker. Étant donné qu'un corps de réponse ne peut pas être consommé plus d'une fois, vous avez besoin de deux copies pour effectuer cette opération. Pour annuler le flux, vous devez ensuite annuler les deux branches obtenues. Si vous activez le partage de file d'attente, le flux est généralement verrouillé pendant toute la durée de l'opération, ce qui empêche les autres lecteurs de le verrouiller.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flux d'octets lisibles

Pour les flux représentant des octets, une version étendue du flux lisible est fournie afin de gérer les octets efficacement, en particulier en minimisant le nombre de copies. Les flux d'octets permettent d'acquérir des lecteurs BYOB (Bring Your OwnBuffer). L'implémentation par défaut peut fournir une plage de sorties différentes, telles que des chaînes ou des tampons de tableau pour WebSockets, tandis que les flux d'octets garantissent la sortie d'octets. De plus, les lecteurs BYOB présentent des avantages en termes de stabilité. En effet, si un tampon se dissocie, cela peut garantir que vous n'écrivez pas deux fois dans le même tampon, ce qui évite les conditions de concurrence. Les lecteurs BYOB peuvent réduire le nombre d'exécutions de récupération de mémoire par le navigateur, car ils peuvent réutiliser les tampons.

Créer un flux d'octets lisibles

Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type supplémentaire au constructeur ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Un ReadableByteStreamController à manipuler est attribué à la source sous-jacente d'un flux d'octets lisible. Sa méthode ReadableByteStreamController.enqueue() accepte un argument chunk dont la valeur est ArrayBufferView. La propriété ReadableByteStreamController.byobRequest renvoie la demande d'extraction BYOB actuelle, ou la valeur "null" s'il n'y en a pas. Enfin, la propriété ReadableByteStreamController.desiredSize renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.

queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, et utilise un seul paramètre:

  • highWaterMark: nombre d'octets non négatif indiquant le point d'eau le plus élevé du flux utilisant cette stratégie de mise en file d'attente. Cela permet de déterminer la contre-pression, qui se manifeste via la propriété ReadableByteStreamController.desiredSize appropriée. Il régit également le moment où la méthode pull() de la source sous-jacente est appelée.

Méthodes getReader() et read()

Vous pouvez ensuite accéder à un ReadableStreamBYOBReader en définissant le paramètre mode en conséquence : ReadableStream.getReader({ mode: "byob" }). Cela permet un contrôle plus précis sur l'allocation de la mémoire tampon afin d'éviter les copies. Pour lire le flux d'octets, vous devez appeler ReadableStreamBYOBReader.read(view), où view est un ArrayBufferView.

Exemple de code de flux d'octets lisibles

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La fonction suivante renvoie des flux d'octets lisibles qui permettent une lecture efficace d'un tableau généré de manière aléatoire, sans copie. Au lieu d'utiliser une taille de fragment prédéterminée de 1 024, il tente de remplir le tampon fourni par le développeur, ce qui permet un contrôle total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mécanisme d'un flux accessible en écriture

Un flux accessible en écriture est une destination dans laquelle vous pouvez écrire des données. Elle est représentée en JavaScript par un objet WritableStream. Il s'agit d'une abstraction qui s'ajoute à un récepteur sous-jacent (récepteur d'E/S de niveau inférieur dans lequel les données brutes sont écrites).

Les données sont écrites dans le flux via un writer, un fragment à la fois. Un bloc peut prendre de nombreuses formes, tout comme les fragments d'un lecteur. Vous pouvez utiliser le code de votre choix pour produire des fragments prêts à l'écriture. Le rédacteur et le code associé sont appelés producteurs.

Lorsqu'un rédacteur est créé et commence à écrire dans un flux (rédacteur actif), il est verrouillé. Un seul rédacteur peut écrire à la fois dans un flux accessible en écriture. Si vous souhaitez qu'un autre rédacteur commence à écrire dans votre flux, vous devez généralement le libérer avant d'y associer un autre rédacteur.

Une file d'attente interne assure le suivi des fragments qui ont été écrits dans le flux, mais qui n'ont pas encore été traités par le récepteur sous-jacent.

Une stratégie de mise en file d'attente est un objet qui détermine la manière dont un flux doit signaler une contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare la taille totale de tous les fragments de la file d'attente à un nombre spécifié, appelé point d'eau élevé.

La construction finale est appelée contrôleur. Chaque flux accessible en écriture est associé à un contrôleur qui vous permet de contrôler le flux (par exemple, pour l'annuler).

Créer un flux accessible en écriture

L'interface WritableStream de l'API Streams fournit une abstraction standard pour écrire des flux de données dans une destination, appelée récepteur. Cet objet intègre des fonctionnalités de contre-pression et de mise en file d'attente. Vous créez un flux accessible en écriture en appelant son constructeur WritableStream(). Elle comporte un paramètre underlyingSink facultatif, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.

underlyingSink

underlyingSink peut inclure les méthodes facultatives suivantes définies par le développeur. Le paramètre controller transmis à certaines méthodes est un WritableStreamDefaultController.

  • start(controller): cette méthode est appelée immédiatement lors de la construction de l'objet. Le contenu de cette méthode doit viser à accéder au récepteur sous-jacent. Si ce processus doit être effectué de manière asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec.
  • write(chunk, controller): cette méthode est appelée lorsqu'un nouveau fragment de données (spécifié dans le paramètre chunk) est prêt à être écrit dans le récepteur sous-jacent. Il peut renvoyer une promesse pour signaler le succès ou l'échec de l'opération d'écriture. Cette méthode n'est appelée qu'une fois les écritures précédentes effectuées, et jamais après la fermeture ou l'annulation du flux.
  • close(controller): cette méthode est appelée si l'application indique qu'elle a fini d'écrire des fragments dans le flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures sur le récepteur sous-jacent et libérer l'accès à celui-ci. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'une fois que toutes les écritures en file d'attente ont abouti.
  • abort(reason): cette méthode est appelée si l'application indique qu'elle souhaite fermer brusquement le flux et le placer dans un état erroné. Il peut nettoyer toutes les ressources conservées, comme close(), mais abort() est appelé même si les écritures sont mises en file d'attente. Ces fragments sont éliminés. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètre reason contient un élément DOMString décrivant la raison pour laquelle le flux a été annulé.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

L'interface WritableStreamDefaultController de l'API Streams représente un contrôleur permettant de contrôler l'état d'un WritableStream pendant la configuration, à mesure que d'autres fragments sont envoyés en écriture ou à la fin de l'écriture. Lors de la construction d'un WritableStream, le récepteur sous-jacent reçoit une instance WritableStreamDefaultController correspondante à manipuler. WritableStreamDefaultController ne comporte qu'une seule méthode : WritableStreamDefaultController.error(), qui entraîne une erreur de toutes les futures interactions avec le flux associé. WritableStreamDefaultController accepte également une propriété signal qui renvoie une instance de AbortSignal, ce qui permet d'arrêter une opération WritableStream si nécessaire.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Le deuxième argument, également facultatif, du constructeur WritableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, en utilisant deux paramètres:

  • highWaterMark: nombre non négatif indiquant la ligne la plus haute du flux avec cette stratégie de mise en file d'attente.
  • size(chunk): fonction qui calcule et renvoie la taille non négative finie de la valeur de segment donnée. Le résultat est utilisé pour déterminer la contre-pression, qui se manifeste via la propriété WritableStreamDefaultWriter.desiredSize appropriée.

Méthodes getWriter() et write()

Pour écrire dans un flux accessible en écriture, vous avez besoin d'un rédacteur, qui sera un WritableStreamDefaultWriter. La méthode getWriter() de l'interface WritableStream renvoie une nouvelle instance de WritableStreamDefaultWriter et verrouille le flux sur cette instance. Lorsque le flux est verrouillé, aucun autre auteur ne peut être acquis tant que le flux en cours n'est pas disponible.

La méthode write() de l'interface WritableStreamDefaultWriter écrit un fragment de données transmis dans un élément WritableStream et son récepteur sous-jacent, puis renvoie une promesse qui se résout pour indiquer le succès ou l'échec de l'opération d'écriture. Notez que la réussite dépend du récepteur sous-jacent ; cela peut indiquer que le fragment a été accepté, et pas nécessairement qu'il est enregistré en toute sécurité dans sa destination finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Propriété locked

Pour vérifier si un flux accessible en écriture est verrouillé, accédez à sa propriété WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemple de code de flux accessible en écriture

L'exemple de code ci-dessous montre toutes les étapes en action.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Déplacer un flux lisible vers un flux accessible en écriture

Un flux lisible peut être redirigé vers un flux accessible en écriture via la méthode pipeTo() du flux lisible. ReadableStream.pipeTo() dirige le ReadableStream actuel vers un WritableStream donné et renvoie une promesse qui se termine lorsque le processus de piping aboutit ou rejette si des erreurs se sont produites.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Créer un flux de transformation

L'interface TransformStream de l'API Streams représente un ensemble de données transformables. Vous créez un flux de transformation en appelant son constructeur TransformStream(), qui crée et renvoie un objet de flux de transformation à partir des gestionnaires donnés. Le constructeur TransformStream() accepte comme premier argument un objet JavaScript facultatif représentant transformer. Ces objets peuvent contenir l'une des méthodes suivantes:

transformer

  • start(controller): cette méthode est appelée immédiatement lors de la construction de l'objet. Elle est généralement utilisée pour mettre des fragments de préfixe en file d'attente, à l'aide de controller.enqueue(). Ces fragments seront lus du côté lisible, mais ne dépendent d'aucune écriture du côté accessible en écriture. Si ce processus initial est asynchrone, par exemple parce qu'il faut des efforts pour acquérir les fragments de préfixe, la fonction peut renvoyer une promesse pour signaler le succès ou l'échec. Une promesse refusée entraînera une erreur dans le flux. Toutes les exceptions générées sont à nouveau générées par le constructeur TransformStream().
  • transform(chunk, controller): cette méthode est appelée lorsqu'un nouveau fragment écrit à l'origine sur le côté accessible en écriture est prêt à être transformé. L'implémentation du flux garantit que cette fonction ne sera appelée qu'une fois les transformations précédentes effectuées, et jamais avant la fin de start() ni après l'appel de flush(). Cette fonction effectue le travail de transformation réel du flux de transformation. Il peut mettre les résultats en file d'attente à l'aide de controller.enqueue(). Ainsi, un seul fragment écrit sur le côté accessible en écriture peut générer zéro ou plusieurs fragments du côté lisible, en fonction du nombre de fois où controller.enqueue() est appelé. Si le processus de transformation est asynchrone, cette fonction peut renvoyer une promesse pour signaler le succès ou l'échec de la transformation. Une promesse refusée entraînera une erreur des côtés lisible et accessible en écriture du flux de transformation. Si aucune méthode transform() n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente les fragments inchangés du côté accessible en écriture vers le côté lisible.
  • flush(controller): cette méthode est appelée une fois que tous les fragments écrits sur le côté accessible en écriture ont été transformés en passant par transform() et que le côté accessible en écriture est sur le point d'être fermé. En règle générale, cette méthode permet de mettre en file d'attente les fragments de suffixe vers le côté lisible, avant qu'ils ne soient trop fermés. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse pour signaler la réussite ou l'échec. Le résultat sera communiqué à l'appelant de stream.writable.write(). De plus, une promesse refusée entraînera une erreur des côtés lisible et accessible en écriture du flux. Lancer une exception est traité de la même manière que renvoyer une promesse refusée.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Stratégies de mise en file d'attente writableStrategy et readableStrategy

Le deuxième et le troisième paramètres facultatifs du constructeur TransformStream() sont les stratégies de mise en file d'attente writableStrategy et readableStrategy facultatives. Ils sont définis comme indiqué dans les sections lisible et en écriture, respectivement.

Exemple de code de flux de transformation

L'exemple de code suivant montre un flux de transformation simple en action.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Diriger un flux lisible vers un flux de transformation

La méthode pipeThrough() de l'interface ReadableStream fournit un moyen enchaîné de rediriger le flux actuel via un flux de transformation ou toute autre paire accessible en écriture/lisible. La redirection d'un flux le verrouille généralement pendant toute la durée du pipeline, empêchant ainsi les autres lecteurs de le verrouiller.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

L'exemple de code suivant (un peu artificiel) montre comment vous pouvez implémenter une version "criant" de fetch() qui met tout le texte en majuscules en consommant la promesse de réponse renvoyée sous forme de flux et en majuscules bloc par bloc. L'avantage de cette approche est que vous n'avez pas besoin d'attendre que l'ensemble du document soit téléchargé, ce qui peut faire une énorme différence en cas de fichiers volumineux.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Démonstration

La démonstration ci-dessous présente des flux lisibles, accessibles en écriture et de transformation. Il comprend également des exemples de chaînes de pipes pipeThrough() et pipeTo(), et illustre également tee(). Vous pouvez éventuellement exécuter la démonstration dans sa propre fenêtre ou afficher le code source.

Flux utiles disponibles dans le navigateur

Un certain nombre de flux utiles sont intégrés directement dans le navigateur. Vous pouvez facilement créer un ReadableStream à partir d'un blob. La méthode stream() de l'interface Blob renvoie un ReadableStream qui, lors de la lecture, renvoie les données contenues dans l'objet blob. Rappelez-vous également qu'un objet File est un type spécifique de Blob et peut être utilisé dans tous les contextes possibles d'un blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Les variantes de streaming de TextDecoder.decode() et TextEncoder.encode() sont respectivement appelées TextDecoderStream et TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Il est facile de compresser ou de décompresser un fichier avec les flux de transformation CompressionStream et DecompressionStream, respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip) directement dans le navigateur et écrire le fichier compressé directement sur le disque.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Les FileSystemWritableFileStream de l'API File System Access et les flux de requêtes fetch() expérimentaux sont des exemples de flux accessibles en écriture dans l'état.

L'API Serial fait un usage intensif des flux en lecture et en écriture.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Enfin, l'API WebSocketStream intègre les flux à l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Ressources utiles

Remerciements

Cet article a été lu par Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibal m'ont beaucoup aidé à comprendre les flux. Certains exemples de code s'inspirent des explorations effectuées par l'utilisateur GitHub @bellbind et certaines parties de la prose s'appuient en grande partie sur les documents Web Docs sur les flux. Les auteurs de Streams Standard ont fait un travail remarquable sur la rédaction de cette spécification. Image héros de Ryan Lara sur Unsplash.