Flux : guide définitif

Découvrez comment utiliser des flux lisibles et accessibles en écriture, et comment transformer des flux avec l'API Streams.

L'API Streams vous permet d'accéder de manière programmatique aux flux de données reçus sur le réseau ou créées par quelque moyen que ce soit, localement et et les traiter avec JavaScript. Le traitement par flux implique de décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer puis les traiter petit à petit. Lorsque le streaming est quelque chose le navigateur reçoit de toute façon des éléments tels que du code HTML ou des vidéos à afficher sur les pages Web. n'était jamais disponible pour JavaScript avant l'introduction de fetch avec les flux en 2015.

Auparavant, si vous vouliez traiter une ressource quelconque (qu'il s'agisse d'une vidéo, d'un fichier texte, etc.), vous devez télécharger le fichier entier, attendre qu'il soit désérialisé dans un format approprié, puis les traiter. Les flux étant accessibles en JavaScript, tout change. Vous pouvez désormais traiter les données brutes avec JavaScript progressivement dès qu'il est disponible sur le client, sans avoir à générer de tampon, de chaîne ou de blob. Cela débloque un certain nombre de cas d'utilisation, dont certains sont listés ci-dessous:

  • Effets vidéo:canalisation d'un flux vidéo lisible par un flux de transformation qui applique des effets en temps réel.
  • (dé)compression de données:acheminement d'un flux de fichiers via un flux de transformation qui (dé)compresse le fichier.
  • Décodage d'image:canalisation d'un flux de réponse HTTP via un flux de transformation qui décode des octets en données bitmap, puis dans un autre flux de transformation qui convertit les bitmaps en PNG. Si installé dans le gestionnaire fetch d'un service worker, ce qui vous permet d'émuler un polyfill de manière transparente de nouveaux formats d'image comme AVIF.

Prise en charge des navigateurs

ReadableStream et WritableStream

Navigateurs pris en charge

  • Chrome: 43 <ph type="x-smartling-placeholder">
  • Edge: 14 <ph type="x-smartling-placeholder">
  • Firefox: 65 <ph type="x-smartling-placeholder">
  • Safari: 10.1. <ph type="x-smartling-placeholder">

Source

TransformStream

Navigateurs pris en charge

  • Chrome: 67 <ph type="x-smartling-placeholder">
  • Edge: 79 <ph type="x-smartling-placeholder">
  • Firefox: 102. <ph type="x-smartling-placeholder">
  • Safari: 14.1. <ph type="x-smartling-placeholder">

Source

Concepts fondamentaux

Avant d'entrer dans les détails des différents types de flux, voyons quelques concepts fondamentaux.

Fragments

Un fragment est une donnée unique écrite ou lue dans un flux. Il peut s'agir de n'importe quel Type les flux peuvent même contenir des blocs de différents types. La plupart du temps, un fragment ne sera pas le plus atomique pour un flux donné. Par exemple, un flux d'octets peut contenir des fragments constitués de 16 Unités Kio Uint8Array, au lieu d'octets uniques.

Flux lisibles

Un flux lisible représente une source de données à partir de laquelle vous pouvez lire des données. En d'autres termes, les données sont à partir d'un flux lisible. Concrètement, un flux lisible est une instance de ReadableStream. .

Flux accessibles en écriture

Un flux accessible en écriture représente une destination de données dans laquelle vous pouvez écrire. En d’autres termes, les données entre dans un flux accessible en écriture. Concrètement, un flux accessible en écriture est une instance WritableStream.

Transformer des flux

Un flux de transformation se compose d'une paire de flux: un flux accessible en écriture (appelé son côté accessible en écriture). et un flux lisible, appelé "côté lisible". Une métaphore du monde réel pour cela serait un interpréteur simultané qui traduit d'une langue à une autre à la volée. D'une manière spécifique au flux de transformation, écrire vers le côté accessible en écriture, les nouvelles données sont accessibles en lecture le plus lisible possible. Concrètement, tout objet avec une propriété writable et une propriété readable peut être diffusé en tant que flux de transformation. Cependant, la classe TransformStream standard facilite la création une telle paire qui est correctement emmêlée.

Chaînes de tuyaux

Les flux sont principalement utilisés par transmission entre eux. Un flux lisible peut être dirigé directement vers un flux accessible en écriture, à l'aide de la méthode pipeTo() du flux lisible, ou vers un flux accessible en écriture. ou plusieurs flux de transformation en premier, à l'aide de la méthode pipeThrough() du flux lisible. Un ensemble de embarqués de cette manière est appelé "chaîne pipe".

Contre-pression

Une fois qu'une chaîne de pipe est construite, elle propage des signaux concernant la vitesse à laquelle les fragments doivent circuler à travers ce réseau. Si une étape de la chaîne ne peut pas encore accepter les fragments, elle propage un signal en arrière. dans la chaîne du pipeline, jusqu'à ce que finalement la source d'origine cesse de produire des fragments rapidement. Ce processus de normalisation du flux est appelé "contre-pression".

Tee

Un flux lisible peut être nommé d'après la forme d'un "T" majuscule à l'aide de sa méthode tee(). Le flux est alors verrouillé, c'est-à-dire qu'il ne peut plus être utilisé directement. Cependant, cela créera deux nouvelles flux, appelés branches, qui peuvent être utilisés indépendamment. Le Teeing est également important car les flux ne peuvent pas être retournés ou redémarrés. Nous reviendrons sur ce point plus tard.

<ph type="x-smartling-placeholder">
</ph> Schéma d&#39;une chaîne de pipeline composée d&#39;un flux lisible provenant d&#39;un appel à l&#39;API de récupération, qui est ensuite acheminé via un flux de transformation dont la sortie est transmise, puis envoyée au navigateur pour le premier flux lisible obtenu et au cache du service worker pour le deuxième flux lisible obtenu.
Une chaîne de tuyau.

Fonctionnement d'un flux lisible

Un flux lisible est une source de données représentée en JavaScript par un Un objet ReadableStream qui provenant d'une source sous-jacente. La ReadableStream() crée et renvoie un objet de flux lisible à partir des gestionnaires donnés. Il y a deux types de sources sous-jacentes:

  • Les sources push vous transmettent constamment les données lorsque vous y accédez, et c'est à vous de démarrer, suspendre ou annuler l'accès au flux. Il peut s'agir de flux vidéo en direct, d'événements envoyés par le serveur, ou WebSockets.
  • Avec les sources pull, vous devez leur demander explicitement des données une fois la connexion établie. Exemples incluent des opérations HTTP via des appels fetch() ou XMLHttpRequest.

Les données de flux sont lues de manière séquentielle, sous la forme de petits morceaux appelés morceaux. Les fragments placés dans un flux sont considérés comme mis en file d'attente. Cela signifie qu'ils attendent dans une file d'attente prêts à être lus. Une file d'attente interne assure le suivi des fragments qui n'ont pas encore été lus.

Une stratégie de mise en file d'attente est un objet qui détermine la manière dont un flux doit signaler une contre-pression en fonction l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare les la taille totale de tous les fragments de la file d'attente jusqu'à un nombre spécifié, appelé marge haute.

Les fragments du flux sont lus par un lecteur. Ce lecteur récupère les données fragment par fragment ce qui vous permet d'effectuer le type d'opération que vous voulez faire. Le lecteur et l'autre qui accompagne ce code s'appelle un consommateur.

Dans ce contexte, la construction suivante est appelée contrôleur. Chaque flux lisible est associé à qui, comme son nom l'indique, vous permet de contrôler le flux.

Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (autrement dit, devient un lecteur actif), il est verrouillé sur celui-ci. Si vous voulez qu'un autre lecteur prenne le relais lecture de votre flux, vous devez généralement libérer le premier lecteur avant de faire quoi que ce soit d'autre (même si vous pouvez tirer des flux).

Créer un flux lisible

Pour créer un flux lisible, appelez son constructeur. ReadableStream() Le constructeur comporte un argument facultatif underlyingSource, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.

underlyingSource

Vous pouvez utiliser les méthodes facultatives suivantes définies par le développeur:

  • start(controller): appelé immédiatement lors de la construction de l'objet. La peut accéder à la source du flux et faire toute autre action nécessaires pour configurer le fonctionnement du flux. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètre controller transmis à cette méthode est un ReadableStreamDefaultController
  • pull(controller): permet de contrôler le flux lorsque davantage de fragments sont récupérés. Il est appelé à plusieurs reprises tant que la file d'attente interne de fragments du flux n'est pas pleine, jusqu'à ce que la file d'attente atteint son point culminant. Si le résultat de l'appel de pull() est une promesse, pull() ne sera pas rappelé tant que cette promesse n'est pas remplie. Si la promesse est refusée, le flux sera erroné.
  • cancel(reason): appelé lorsque le client du flux annule le flux.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController accepte les méthodes suivantes:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, ce qui nécessite deux paramètres:

  • highWaterMark: nombre non négatif indiquant la marque des eaux haute du cours d'eau utilisant cette stratégie de mise en file d'attente.
  • size(chunk): fonction qui calcule et renvoie la taille non négative finie de la valeur de fragment donnée. Le résultat permet de déterminer la contre-pression, qui se manifeste via la propriété ReadableStreamDefaultController.desiredSize appropriée. Elle régit également l'appel de la méthode pull() de la source sous-jacente.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Les méthodes getReader() et read()

Pour lire à partir d'un flux lisible, vous avez besoin d'un lecteur, qui sera ReadableStreamDefaultReader La méthode getReader() de l'interface ReadableStream crée un lecteur et verrouille le flux Lorsque le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.

read() de l'interface ReadableStreamDefaultReader renvoie une promesse fournissant l'accès au prochain dans la file d'attente interne du flux. Il traite ou rejette avec un résultat en fonction de l’état de le flux. Les différentes possibilités sont les suivantes:

  • Si un fragment est disponible, la promesse est tenue avec un objet au format
    . { value: chunk, done: false }
  • Si le flux est fermé, la promesse est tenue avec un objet au format
    . { value: undefined, done: true }
  • Si le flux comporte une erreur, la promesse est refusée avec l'erreur correspondante.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Propriété locked

Vous pouvez vérifier si un flux lisible est verrouillé en accédant à son ReadableStream.locked .

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemples de code de flux lisibles

L'exemple de code ci-dessous illustre toutes les étapes en action. Vous créez d'abord un ReadableStream qui, dans son L'argument underlyingSource (c'est-à-dire la classe TimestampSource) définit une méthode start(). Cette méthode indique au controller du flux de enqueue() est un code temporel toutes les secondes pendant 10 secondes. Enfin, il indique au contrôleur d'effectuer une opération close() sur le flux. Vous consommez ceci flux en créant un lecteur via la méthode getReader() et en appelant read() jusqu'à ce que le flux soit done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Itération asynchrone

Vérifier à chaque itération de la boucle read() si le flux est done n'est peut-être pas l'API la plus pratique. Heureusement, il y aura bientôt une meilleure façon de le faire: l'itération asynchrone.

for await (const chunk of stream) {
  console.log(chunk);
}

Une solution de contournement pour utiliser l'itération asynchrone aujourd'hui consiste à implémenter le comportement avec un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Intégrer un flux lisible

La méthode tee() du L'interface ReadableStream renvoie le flux lisible actuel et renvoie un tableau à deux éléments. contenant les deux branches obtenues en tant que nouvelles instances ReadableStream. Cela permet deux lecteurs de lire un flux simultanément. Vous pouvez effectuer cette opération, par exemple, dans un service worker si vous voulez récupérer une réponse du serveur et la diffuser dans le navigateur, mais aussi dans le dans le cache du nœud de calcul de service. Étant donné qu'un corps de réponse ne peut pas être consommé plusieurs fois, vous avez besoin de deux copies pour ce faire. Pour annuler le flux, vous devez ensuite annuler les deux branches générées. Lancer un flux le verrouillera généralement pendant cette durée, ce qui empêchera les autres lecteurs de le verrouiller.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flux d'octets lisibles

Pour les flux représentant des octets, une version étendue du flux lisible est fournie pour gérer de manière efficace, en particulier en minimisant le nombre de copies. Les flux d'octets permettent l'utilisation de la mémoire tampon (BYOB) lecteurs à acquérir. L'implémentation par défaut peut générer différentes sorties, telles que en tant que chaînes ou tampons de tableau dans le cas de WebSockets, tandis que les flux d'octets garantissent une sortie en octets. De plus, les lecteurs BYOB offrent des avantages en termes de stabilité. C'est car si un tampon se détache, cela peut garantir qu'on n'écrira pas deux fois dans le même tampon, et d'éviter ainsi les conditions de concurrence. Les lecteurs BYOB permettent de réduire le nombre d'exécutions du navigateur la récupération de mémoire, car elle peut réutiliser les tampons.

Créer un flux d'octets lisible

Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type supplémentaire à la ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

La source sous-jacente d'un flux d'octets lisible reçoit un ReadableByteStreamController pour manipuler. Sa méthode ReadableByteStreamController.enqueue() accepte un argument chunk dont la valeur est un ArrayBufferView. La propriété ReadableByteStreamController.byobRequest renvoie la valeur Requête d'extraction BYOB, ou valeur "null" en l'absence de requête Enfin, le ReadableByteStreamController.desiredSize renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.

queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, ce qui nécessite :

  • highWaterMark: nombre non négatif d'octets indiquant la marque des hautes eaux du flux utilisant cette stratégie de mise en file d'attente. Cela permet de déterminer la contre-pression, qui se manifeste via la propriété ReadableByteStreamController.desiredSize appropriée. Elle régit également l'appel de la méthode pull() de la source sous-jacente.

Les méthodes getReader() et read()

Vous pouvez ensuite accéder à un ReadableStreamBYOBReader en définissant le paramètre mode comme suit: ReadableStream.getReader({ mode: "byob" }) Cela permet un contrôle plus précis de la mémoire tampon afin d'éviter les copies. Pour lire à partir du flux d'octets, vous devez appeler ReadableStreamBYOBReader.read(view), où view est un ArrayBufferView

Exemple de code de flux d'octets lisible

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La fonction suivante renvoie des flux d'octets lisibles qui permettent de lire efficacement les données sans copie généré de manière aléatoire. Au lieu d'utiliser une taille de fragment prédéterminée de 1 024, il tente de remplir du tampon fourni par le développeur, ce qui permet un contrôle total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Fonctionnement d'un flux accessible en écriture

Un flux accessible en écriture est une destination dans laquelle vous pouvez écrire des données. objet WritableStream. Ce sert d'abstraction au-dessus d'un récepteur sous-jacent, un récepteur d'E/S de niveau inférieur dans lequel les données brutes sont écrites.

Les données sont écrites dans le flux par le biais d'un rédacteur, fragment par morceau. Un fragment peut prendre sous une multitude de formes, tout comme les fragments d'un lecteur. Vous pouvez utiliser le code de votre choix les fragments prêts à être écrits ; l'auteur et le code associé sont appelés producteurs.

Lorsqu'un auteur est créé et commence à écrire dans un flux (un auteur actif), on dit qu'il est verrouillé dessus. Un seul rédacteur peut écrire dans un flux accessible en écriture à la fois. Si vous voulez une autre pour écrire dans votre flux, vous devez généralement le publier avant de joindre un autre rédacteur.

Une file d'attente interne assure le suivi des fragments qui ont été écrits dans le flux, mais pas encore. traités par le récepteur sous-jacent.

Une stratégie de mise en file d'attente est un objet qui détermine la manière dont un flux doit signaler une contre-pression en fonction l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare les la taille totale de tous les fragments de la file d'attente jusqu'à un nombre spécifié, appelé marge haute.

La construction finale est appelée contrôleur. Chaque flux accessible en écriture est associé à un contrôleur vous permet de contrôler le flux (par exemple, de l'annuler).

Créer un flux accessible en écriture

L'interface WritableStream de L'API Streams fournit une abstraction standard pour écrire des flux de données dans une destination, connue en tant que récepteur. Cet objet intègre une contre-pression et une mise en file d'attente. Pour créer un flux accessible en écriture appeler son constructeur. WritableStream() Il comporte un paramètre underlyingSink facultatif, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.

underlyingSink

underlyingSink peut inclure les méthodes facultatives suivantes définies par le développeur. controller transmis à certaines méthodes est WritableStreamDefaultController

  • start(controller): cette méthode est appelée immédiatement lors de la construction de l'objet. La le contenu de cette méthode doit avoir pour objectif d'accéder au récepteur sous-jacent. Si ce processus doit être effectuée de manière asynchrone, elle peut renvoyer une promesse pour signaler la réussite ou l'échec.
  • write(chunk, controller): cette méthode est appelée lorsqu'un nouveau fragment de données (spécifié dans le chunk) est prêt à être écrit dans le récepteur sous-jacent. Elle peut renvoyer une promesse signalent la réussite ou l'échec de l'opération d'écriture. Cette méthode ne sera appelée qu'après la les écritures ont réussi, et jamais après la fermeture ou l'annulation du flux.
  • close(controller): cette méthode est appelée si l'application indique qu'elle a fini d'écrire des fragments au flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures sur le récepteur sous-jacent et en libérant l'accès. Si ce processus est asynchrone, il peut renvoyer une de signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'une fois toutes les écritures en file d'attente ont réussi.
  • abort(reason): cette méthode est appelée si l'application signale qu'elle souhaite se fermer brusquement. le flux et le mettre dans un état erroné. Il peut nettoyer toutes les ressources conservées, close(), mais abort() sera appelé même si les écritures sont en file d'attente. Ces morceaux seront générés à distance. Si ce processus est asynchrone, il peut renvoyer une promesse indiquant la réussite ou l'échec de l'opération. La Le paramètre reason contient un élément DOMString décrivant la raison pour laquelle le flux a été annulé.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

La WritableStreamDefaultController interface de l'API Streams représente un contrôleur permettant de contrôler l'état d'une WritableStream pendant la configuration, lorsque davantage de fragments sont envoyés pour écriture, ou à la fin de l'écriture. Lors de la construction un WritableStream, le récepteur sous-jacent reçoit un WritableStreamDefaultController correspondant ; une instance à manipuler. WritableStreamDefaultController n'a qu'une seule méthode: WritableStreamDefaultController.error(), ce qui entraîne une erreur pour toute interaction ultérieure avec le flux associé. WritableStreamDefaultController accepte également une propriété signal qui renvoie une instance de AbortSignal permettant d'arrêter une opération WritableStream si nécessaire.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Le deuxième argument, également facultatif, du constructeur WritableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, ce qui nécessite deux paramètres:

  • highWaterMark: nombre non négatif indiquant la marque des eaux haute du cours d'eau utilisant cette stratégie de mise en file d'attente.
  • size(chunk): fonction qui calcule et renvoie la taille non négative finie de la valeur de fragment donnée. Le résultat permet de déterminer la contre-pression, qui se manifeste via la propriété WritableStreamDefaultWriter.desiredSize appropriée.

Les méthodes getWriter() et write()

Pour écrire dans un flux accessible en écriture, vous avez besoin d'un rédacteur. WritableStreamDefaultWriter La méthode getWriter() de l'interface WritableStream renvoie une nouvelle instance de WritableStreamDefaultWriter et verrouille le flux sur cette instance. Alors que le flux est verrouillé, aucun autre scénariste ne peut être acquis tant que le flux actuel n'est pas libéré.

write() de la classe WritableStreamDefaultWriter écrit un fragment de données transmis dans un WritableStream et son récepteur sous-jacent, puis renvoie une promesse qui se résout pour indiquer la réussite ou l'échec de l'opération d'écriture. Notez que ce que "succès" dépend du récepteur sous-jacent ; cela peut indiquer que le bloc a été accepté, et pas nécessairement qu’elle est enregistrée en toute sécurité dans sa destination finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Propriété locked

Vous pouvez vérifier si un flux accessible en écriture est verrouillé en accédant à son WritableStream.locked .

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemple de code de flux accessible en écriture

L'exemple de code ci-dessous illustre toutes les étapes en action.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Rediriger un flux lisible vers un flux accessible en écriture

Un flux lisible peut être redirigé vers un flux accessible en écriture via la couche pipeTo(). ReadableStream.pipeTo() dirige le ReadableStream actuel vers un WritableStream donné et renvoie un promesse qui se termine lorsque le processus de piping se termine avec succès, ou qui est rejeté si des erreurs se sont produites rencontrés.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Créer un flux de transformation

L'interface TransformStream de l'API Streams représente un ensemble de données transformables. Toi créer un flux de transformation en appelant son constructeur TransformStream(), qui crée et renvoie un objet de flux de transformation à partir des gestionnaires donnés. Le constructeur TransformStream() accepte comme son premier argument est un objet JavaScript facultatif représentant la transformer. De tels objets peuvent contenant l'une des méthodes suivantes:

transformer

  • start(controller): cette méthode est appelée immédiatement lors de la construction de l'objet. Habituellement Elle permet de mettre en file d'attente des fragments de préfixes à l'aide de controller.enqueue(). Ces fragments seront lus du côté lisible, mais ne dépendent d'aucune écriture vers le côté accessible en écriture. Si cette initiale est asynchrone, par exemple parce qu'il faut un certain effort pour acquérir les fragments de préfixe, la fonction peut renvoyer une promesse pour signaler la réussite ou l'échec ; une promesse refusée génère une erreur flux. Toutes les exceptions générées sont renvoyées par le constructeur TransformStream().
  • transform(chunk, controller): cette méthode est appelée lorsqu'un nouveau fragment écrit initialement dans le accessible en écriture est prêt à être transformé. L'implémentation de flux garantit que cette fonction est appelé uniquement après la réussite des transformations précédentes et start() n'a jamais été terminée ou après l'appel de flush(). Cette fonction effectue la transformation du flux de transformation. Elle peut mettre les résultats en file d'attente à l'aide de controller.enqueue(). Ce permet à un seul bloc écrit sur le côté accessible en écriture d'obtenir zéro ou plusieurs fragments sur le du côté lisible, en fonction du nombre de fois où controller.enqueue() est appelé. Si le processus de est asynchrone, cette fonction peut renvoyer une promesse pour signaler la réussite ou l'échec la transformation. Une promesse refusée générera une erreur au niveau des côtés lisibles et en écriture de la flux de transformation. Si aucune méthode transform() n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente les fragments inchangés du côté accessible en écriture vers le côté lisible.
  • flush(controller): cette méthode est appelée une fois que tous les fragments écrits dans le côté accessible en écriture ont été transformées en passant avec succès via transform(), et le côté accessible en écriture est sur le point d'être fermé. Généralement utilisé pour mettre en file d'attente des fragments de suffixe du côté lisible, avant cela devient fermée. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse signaler la réussite ou l'échec ; le résultat est communiqué à l'appelant stream.writable.write() De plus, une promesse refusée générera une erreur au niveau accessibles en écriture du flux. Le traitement d'une exception est le même que pour le renvoi d'une exception prometteurs.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Stratégies de mise en file d'attente writableStrategy et readableStrategy

Les deuxième et troisième paramètres facultatifs du constructeur TransformStream() sont facultatifs Stratégies de mise en file d'attente writableStrategy et readableStrategy. Elles sont définies comme indiqué dans les accessible en lecture et le flux accessible en écriture respectivement.

Exemple de code de flux de transformation

L'exemple de code suivant montre un flux de transformation simple en action.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Transmettre un flux lisible via un flux de transformation

pipeThrough() de l'interface ReadableStream fournit un moyen de chaîner le flux actuel via un flux de transformation ou toute autre paire accessible en écriture/lisible. La reprise d'un flux permet de verrouiller pendant toute la durée du pipe, ce qui empêche les autres lecteurs de le verrouiller.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

L'exemple de code suivant (un peu artificiel) montre comment implémenter une requête version de fetch() qui met tout le texte en majuscules en consommant la promesse de réponse renvoyée sous forme de flux et en majuscules, fragment par fragment. L'avantage de cette approche est que vous n'avez pas besoin d'attendre l'ensemble du document à télécharger, ce qui peut faire une grande différence lors du traitement de fichiers volumineux.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Démo

La démonstration ci-dessous présente des flux lisibles et accessibles en écriture, ainsi que des flux de transformation en action. Il comprend également des exemples de chaînes de pipeline pipeThrough() et pipeTo(), et illustre également tee(). Vous pouvez aussi exécuter la démonstration dans sa propre fenêtre ou afficher code source.

Flux utiles disponibles dans le navigateur

Un certain nombre de flux utiles sont intégrés directement dans le navigateur. Vous pouvez facilement créer ReadableStream à partir d'un blob. Blob la méthode stream() de l'interface renvoie un ReadableStream qui, lors de la lecture, renvoie les données contenues dans l'objet blob. N'oubliez pas non plus qu'un L'objet File est un type spécifique Blob, et peut être utilisé dans n'importe quel contexte qu'un blob peut.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Les variantes de streaming de TextDecoder.decode() et TextEncoder.encode() sont appelées TextDecoderStream et TextEncoderStream, respectivement.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

La compression et la décompression d'un fichier sont faciles CompressionStream et Flux de transformation DecompressionStream respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip) directement dans le navigateur et écrivez le fichier compressé directement sur le disque.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

L'API File System Access FileSystemWritableFileStream et les fetch() flux de requêtes expérimentaux exemples de flux accessibles en écriture dans la nature.

L'API Serial utilise de manière intensive les flux accessibles en lecture et en écriture.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Enfin, l'API WebSocketStream intègre les flux avec l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Ressources utiles

Remerciements

Cet article a été examiné par Jake Archibal, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibal m'ont beaucoup aidé à comprendre flux. Certains exemples de code sont inspirés par un utilisateur de GitHub les explorations de @bellbind et parties de la prose s'appuient fortement sur Documents Web MDN sur les flux. La Streams Standard auteurs ont accompli un travail remarquable d'écrire cette spécification. Image héros de Ryan Lara sur Unsplash.