Stream: la guida definitiva

Scopri come utilizzare flussi leggibili, scrivibili e trasformare i flussi con l'API Streams.

L'API Streams ti consente di accedere in modo programmatico agli stream di dati ricevuti sulla rete o creato con qualsiasi mezzo, localmente e l'elaborazione con JavaScript. I flussi di dati prevedono l'analisi di una risorsa che vuoi ricevere, inviare o trasformare in piccoli blocchi, per poi elaborarli gradualmente. Mentre lo streaming è qualcosa i browser effettuano comunque quando ricevono asset come HTML o video da mostrare sulle pagine web, non era mai stata disponibile per JavaScript prima dell'introduzione di fetch con gli stream nel 2015.

In precedenza, se volevi elaborare una risorsa di qualche tipo (che si tratti di un video, di un file di testo e così via), occorre scaricare l'intero file, attendere che venga deserializzato in un formato adatto. per poi elaborarli. Dato che gli stream sono disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript progressivamente non appena è disponibile sul client, senza dover generare un buffer, una stringa o un blob. Ciò sblocca una serie di casi d'uso, alcuni dei quali sono elencati di seguito:

  • Effetti video:trasmette uno stream video leggibile attraverso uno stream di trasformazione che applica effetti in tempo reale.
  • Decompressione dei dati:pipeline di un flusso di file attraverso un flusso di trasformazione che selettivamente (de)comprimela.
  • Decodifica dell'immagine: pipeline di un flusso di risposta HTTP attraverso un flusso di trasformazione che decodifica i byte in dati bitmap e quindi tramite un altro flusso di trasformazione che converte le bitmap in PNG. Se installati all'interno del gestore fetch di un service worker, questo consente di eseguire il polyfill in modo trasparente nuovi formati di immagine come AVIF.

Supporto browser

ReadableStream e WritableStream

Supporto dei browser

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Origine

TransformStream

Supporto dei browser

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Origine

Concetti principali

Prima di entrare nel dettaglio dei vari tipi di stream, vorrei introdurre alcuni concetti fondamentali.

Pezzi

Un blocco è un singolo dato che viene scritto o letto da un flusso. Può essere qualsiasi type; i flussi possono anche contenere blocchi di tipi diversi. La maggior parte delle volte, un blocco non sarà il più atomico unità di dati per un determinato flusso. Ad esempio, un flusso di byte potrebbe contenere blocchi composti da 16 Uint8Array unità KiB, anziché byte singoli.

Flussi leggibili

Un flusso leggibile rappresenta un'origine di dati da cui puoi leggere. In altre parole, i dati vengono forniti da uno stream leggibile. Concretamente, uno stream leggibile è un'istanza dell'elemento ReadableStream .

Flussi scrivibili

Un flusso scrivibile rappresenta una destinazione per i dati in cui è possibile scrivere. In altre parole, i dati entra in uno stream scrivibile. Concretamente, un flusso scrivibile è un'istanza del WritableStream corso.

Trasformare i flussi

Un flusso di trasformazione è costituito da una coppia di flussi: uno stream scrivibile, noto come lato scrivibile. e uno stream leggibile, noto come lato leggibile. Una metafora del mondo reale potrebbe essere interprete simultaneo che traduce da una lingua all'altra al volo. In una maniera specifica per il flusso di trasformazione, scrivendo accessibile in scrittura fa sì che i nuovi dati vengano resi disponibili per la lettura un lato leggibile. Concretamente, qualsiasi oggetto con una proprietà writable e una proprietà readable può essere pubblicato come flusso di trasformazione. Tuttavia, la classe TransformStream standard semplifica la creazione una coppia di questo tipo che è ben intrecciata.

Catene per tubi

Gli stream vengono utilizzati principalmente collegandoli tra loro. Uno stream leggibile può essere trasmesso direttamente a uno stream scrivibile utilizzando il metodo pipeTo() del flusso leggibile oppure può essere inviato tramite pipe o più trasformano i flussi, utilizzando il metodo pipeThrough() del flusso leggibile. Una serie di flussi collegati tra loro in questo modo sono dette "catene di tubi".

Contropressione

Una volta creata una catena di tubazioni, propaga i segnali relativi alla velocità di flusso dei blocchi e lo attraverso. Se un passaggio della catena non può ancora accettare chunk, propaga un segnale all'indietro attraverso la catena, fino a quando alla fonte originale non viene detto di smettere di produrre blocchi rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.

Maglietta a pallone

È possibile eseguire il tethering di uno stream leggibile (il cui nome segue la forma della "T" maiuscola) utilizzando il metodo tee(). Questa operazione bloccherà lo stream, ovvero non lo renderà più utilizzabile direttamente. vengono create due nuove versioni detti rami, che possono essere consumati in modo indipendente. Anche il teeing è importante perché gli stream non possono essere riavvolti o riavviati; ulteriori informazioni su questo argomento sono in seguito.

Diagramma di una catena di pipeline composta da un flusso leggibile proveniente da una chiamata all'API di recupero che viene quindi inviato tramite pipe attraverso un flusso di trasformazione il cui output viene inviato in T e poi inviato al browser per il primo flusso leggibile risultante e alla cache del service worker per il secondo flusso leggibile risultante.
Una catena di tubi.

I meccanismi di uno stream leggibile

Uno stream leggibile è un'origine dati rappresentata in JavaScript da un Oggetto ReadableStream che da un'origine sottostante. La ReadableStream() crea e restituisce un oggetto flusso leggibile dai gestori specificati. Esistono due metodi tipi di origine sottostante:

  • Le origini push inviano costantemente i dati verso di te quando le accedi e sta a te avviare, mettere in pausa o annullare l'accesso allo stream. Alcuni esempi includono video stream in diretta, eventi inviati dal server, o WebSocket.
  • Le origini pull richiedono di richiedere esplicitamente i dati dopo la connessione. Esempi includi operazioni HTTP tramite chiamate fetch() o XMLHttpRequest.

I dati in streaming vengono letti in sequenza in piccole parti denominate blocchi. Si dice che i blocchi posizionati in uno stream siano in coda. Ciò significa che sono in attesa pronti per essere letti. Una coda interna tiene traccia dei blocchi che non sono stati ancora letti.

Una strategia di accodamento è un oggetto che determina come un flusso dovrebbe segnalare la contropressione in base a lo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ciascun blocco e confronta dimensione totale di tutti i blocchi in coda fino a un numero specificato, noto come livello massimo.

I blocchi all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un blocco di dati che ti consente di eseguire qualsiasi tipo di operazione. Il lettore più l'altro di elaborazione di questo codice è definito consumatore.

Il costrutto successivo in questo contesto è chiamato controller. A ogni stream leggibile è associato un che, come suggerisce il nome, ti consente di controllare lo stream.

Solo un lettore può leggere uno stream alla volta. Quando un lettore viene creato e inizia a leggere uno stream. (ovvero, diventa un lettore attivo), è bloccato. Se vuoi che un altro lettore assuma il controllo per leggere lo stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi usare la funzionalità tee per gli stream).

Creazione di uno stream leggibile

Puoi creare uno stream leggibile chiamando il relativo costruttore ReadableStream() Il costruttore ha un argomento facoltativo underlyingSource, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza di flusso creata.

underlyingSource

In questo modo è possibile utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:

  • start(controller): viene richiamata immediatamente quando l'oggetto viene creato. La può accedere all'origine del flusso e svolgere qualsiasi altra operazione necessari per configurare la funzionalità di streaming. Se questo processo deve essere eseguito in modo asincrono, il metodo può restituiscono una promessa che indica l'esito positivo o negativo. Il parametro controller passato a questo metodo è un ReadableStreamDefaultController
  • pull(controller): può essere utilizzata per controllare il flusso quando vengono recuperati più blocchi. it viene richiamata ripetutamente purché la coda di chunk interna dello stream non sia piena, fino a quando raggiunge il suo picco. Se il risultato della chiamata a pull() è una promessa, pull() non verrà richiamato finché non verrà soddisfatta la promessa. Se la promessa viene rifiutata, lo streaming diventerà errato.
  • cancel(reason): richiamato quando il consumatore dello stream annulla lo stream.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController supporta i seguenti metodi:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. È un oggetto che definisce facoltativamente una strategia di accodamento per il flusso, che richiede due parametri:

  • highWaterMark: un numero non negativo che indica il livello massimo dello stream utilizzando questa strategia di accodamento.
  • size(chunk): una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableStreamDefaultController.desiredSize appropriata. Inoltre, stabilisce quando viene chiamato il metodo pull() dell'origine sottostante.
di Gemini Advanced.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metodi getReader() e read()

Per leggere da uno stream leggibile, hai bisogno di un lettore, che sarà un ReadableStreamDefaultReader Il metodo getReader() dell'interfaccia ReadableStream crea un lettore e blocca lo stream su li annotino. Mentre lo stream è bloccato, nessun altro lettore può essere acquisito fino a quando questo non viene rilasciato.

La read() dell'interfaccia ReadableStreamDefaultReader restituisce una promessa che fornisce l'accesso al un blocco note nella coda interna del flusso. L'annuncio soddisfa o rifiuta con un risultato che dipende dallo stato di durante lo streaming. Le diverse possibilità sono le seguenti:

  • Se un blocco è disponibile, la promessa verrà soddisfatta con un oggetto del modulo
    { value: chunk, done: false }.
  • Se lo stream viene chiuso, la promessa verrà soddisfatta con un oggetto nel formato
    { value: undefined, done: true }.
  • In caso di errori relativi allo stream, la promessa viene rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

La proprietà locked

Puoi verificare se uno stream leggibile è bloccato accedendo al relativo ReadableStream.locked proprietà.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempi di codice di flusso leggibili

L'esempio di codice seguente mostra tutti i passaggi in azione. Innanzitutto, crei un ReadableStream che nei suoi L'argomento underlyingSource (ovvero la classe TimestampSource) definisce un metodo start(). Questo metodo indica all'controller dello stream di enqueue() un timestamp ogni secondo per dieci secondi. Infine, comunica al controller di close() lo stream. Usi questo lo stream creando un lettore con il metodo getReader() e chiamando read() finché lo stream non viene done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterazione asincrona

Controllare a ogni iterazione del loop read() se lo stream è done potrebbe non essere l'API più comoda. Fortunatamente, presto ci sarà un modo migliore per farlo: l'iterazione asincrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una soluzione alternativa per utilizzare l'iterazione asincrona è implementare il comportamento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Riproduzione di uno stream leggibile

Il metodo tee() del metodo L'interfaccia ReadableStream avvia lo stream leggibile corrente, restituendo un array a due elementi contenente i due rami risultanti come nuove istanze ReadableStream. Ciò consente due lettori per leggere uno stream contemporaneamente. Potresti farlo, ad esempio, in un service worker se si desidera recuperare una risposta dal server e trasmetterla in modalità flusso al browser, ma anche e la cache del service worker. Poiché il corpo di una risposta non può essere utilizzato più di una volta, sono necessarie due copie per eseguire questa operazione. Per annullare lo stream, devi annullare entrambi i rami risultanti. Avviamento di un live streaming in genere lo bloccheranno per l'intera durata, impedendo ad altri lettori di bloccarlo.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flussi di byte leggibili

Per gli stream che rappresentano i byte, viene fornita una versione estesa dello stream leggibile in modo efficiente, in particolare riducendo al minimo le copie. I flussi di byte consentono il buffer del modello lettori (BYOB) da acquisire. L'implementazione predefinita può fornire una gamma di output diversi come come stringhe o buffer di array nel caso di WebSocket, mentre i flussi di byte garantiscono l'output dei byte. Inoltre, i lettori BYOB offrono vantaggi in termini di stabilità. Questo è perché se un buffer si scollega, è possibile garantire che non scriva due volte nello stesso buffer, evitando così le condizioni di gara. I lettori BYOB possono ridurre il numero di volte in cui il browser deve essere eseguito garbage collection, perché può riutilizzare i buffer.

Creazione di un flusso di byte leggibile

Puoi creare uno stream di byte leggibile passando un parametro type aggiuntivo alla costruttore ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

All'origine sottostante di un flusso di byte leggibile viene assegnato un ReadableByteStreamController a manipolare. Il suo metodo ReadableByteStreamController.enqueue() prende un argomento chunk il cui valore è un ArrayBufferView. La proprietà ReadableByteStreamController.byobRequest restituisce lo stato attuale Richiesta di pull BYOB o nullo in caso contrario. Infine, ReadableByteStreamController.desiredSize restituisce la dimensione desiderata per riempire la coda interna dello stream controllato.

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. È un oggetto che definisce facoltativamente una strategia di accodamento per il flusso, che prende una :

  • highWaterMark: un numero non negativo di byte che indica il livello elevato dello stream utilizzando questa strategia di accodamento. Viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableByteStreamController.desiredSize appropriata. Inoltre, stabilisce quando viene chiamato il metodo pull() dell'origine sottostante.
di Gemini Advanced. di Gemini Advanced.

Metodi getReader() e read()

Puoi quindi ottenere l'accesso a un ReadableStreamBYOBReader impostando il parametro mode di conseguenza: ReadableStream.getReader({ mode: "byob" }). Ciò consente un controllo più preciso sul buffer allo scopo di evitare copie. Per leggere dal flusso di byte, devi chiamare ReadableStreamBYOBReader.read(view), dove view è un ArrayBufferView

Esempio di codice di flusso di byte leggibili

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La seguente funzione restituisce flussi di byte leggibili che consentono una lettura zero-copy efficiente di un generato in modo casuale. Anziché utilizzare una dimensione del chunk predeterminata di 1024, tenta di riempire fornito dallo sviluppatore, consentendo il controllo completo.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

I meccanismi di uno stream scrivibile

Un flusso in scrittura è una destinazione in cui è possibile scrivere dati, rappresentata in JavaScript da un WritableStream. Questo funge da astrazione sopra un sink sottostante, ovvero un sink di I/O di livello inferiore in cui vengono scritti dati non elaborati.

I dati vengono scritti nel flusso tramite un writer, un blocco alla volta. Un chunk può prendere un una moltitudine di forme, proprio come i blocchi in un lettore. Puoi utilizzare il codice che desideri generare i blocchi pronti per la scrittura; l'autore e il codice associato è chiamato producer.

Quando un autore viene creato e inizia a scrivere su uno stream (ossia un autore attivo), si parla di bloccato. Solo un autore può scrivere in uno stream scrivibile alla volta. Se vuoi un'altra autore per iniziare a scrivere sul tuo stream, in genere devi rilasciarlo e poi allegarlo a un altro autore.

Una coda interna tiene traccia dei blocchi scritti nel flusso, ma non ancora vengono elaborati dal sink sottostante.

Una strategia di accodamento è un oggetto che determina come un flusso dovrebbe segnalare la contropressione in base a lo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ciascun blocco e confronta dimensione totale di tutti i blocchi in coda fino a un numero specificato, noto come livello massimo.

Il costrutto finale è chiamato controller. A ogni flusso scrivibile è associato un controller ti consente di controllare lo stream (ad esempio per interromperlo).

Creazione di un flusso scrivibile

L'interfaccia di WritableStream di l'API Streams fornisce un'astrazione standard per la scrittura di flussi di dati in una destinazione, nota come lavandino. Questo oggetto è dotato di contropressione e accodamento integrati. Puoi creare uno stream scrivibile chiama il suo costruttore WritableStream() Ha un parametro facoltativo underlyingSink, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza di flusso creata.

underlyingSink

underlyingSink può includere i seguenti metodi facoltativi definiti dallo sviluppatore. controller passato ad alcuni metodi è un parametro WritableStreamDefaultController

  • start(controller): questo metodo viene chiamato immediatamente quando l'oggetto viene creato. La i contenuti di questo metodo dovrebbero avere l'accesso al sink sottostante. Se questo processo deve essere se eseguita in modo asincrono, può restituire una promessa per segnalare l'esito positivo o negativo.
  • write(chunk, controller): questo metodo viene chiamato quando un nuovo blocco di dati (specificato nel chunk) è pronto per essere scritto nel sink sottostante. Può restituire una promessa è un segnale di successo o errore dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo i precedenti scritture riuscite e mai dopo la chiusura o l'interruzione del flusso.
  • close(controller): questo metodo verrà chiamato se l'app segnala che ha terminato la scrittura di grandi dimensioni nel flusso. I contenuti devono fare tutto ciò che è necessario per finalizzare le scritture sink sottostante e rilasciarne l'accesso. Se questo processo è asincrono, può restituire un promettere di indicare successo o fallimento. Questo metodo viene chiamato solo dopo tutte le scritture in coda hanno avuto successo.
  • abort(reason): questo metodo verrà chiamato se l'app segnala che vuole chiudere bruscamente il flusso e lo metteranno in stato di errore. Può eseguire la pulizia di tutte le risorse conservate, come close(), ma abort() verrà chiamato anche se le scritture sono in coda. Questi blocchi verranno lanciati di distanza. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. La Il parametro reason contiene un valore DOMString che descrive perché lo stream è stato interrotto.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

La WritableStreamDefaultController dell'API Streams rappresenta un controller che consente il controllo dello stato di WritableStream durante l'impostazione, man mano che vengono inviati più blocchi per la scrittura o alla fine della scrittura. Durante la creazione un WritableStream, al sink sottostante viene assegnato un WritableStreamDefaultController da manipolare. WritableStreamDefaultController ha un solo metodo: WritableStreamDefaultController.error(), causando un errore in tutte le interazioni future con lo stream associato. WritableStreamDefaultController supporta anche una proprietà signal che restituisce un'istanza di AbortSignal, consentendo l'arresto di un'operazione WritableStream se necessario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream() è queuingStrategy. È un oggetto che definisce facoltativamente una strategia di accodamento per il flusso, che richiede due parametri:

  • highWaterMark: un numero non negativo che indica il livello massimo dello stream utilizzando questa strategia di accodamento.
  • size(chunk): una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà WritableStreamDefaultWriter.desiredSize appropriata.
di Gemini Advanced.

Metodi getWriter() e write()

Per scrivere in uno stream scrivibile, hai bisogno di un writer, che sarà un WritableStreamDefaultWriter. Il metodo getWriter() dell'interfaccia WritableStream restituisce un nuova istanza di WritableStreamDefaultWriter e blocca il flusso a quell'istanza. Mentre lo stream viene bloccato, nessun altro autore può essere acquisito fino a quando non viene rilasciato quello corrente.

La write() del metodo WritableStreamDefaultWriter scrive un blocco di dati passato in WritableStream e nel sink sottostante, quindi restituisce una promessa che si risolve a indicare l'esito positivo o negativo dell'operazione di scrittura. Ricorda che "riuscito" fino al sink sottostante; potrebbe indicare che il blocco è stato accettato e non necessariamente che venga salvato in sicurezza alla sua destinazione definitiva.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

La proprietà locked

Puoi verificare se uno stream scrivibile è bloccato accedendo alla relativa WritableStream.locked proprietà.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempio di codice di flusso scrivibile

L'esempio di codice seguente mostra tutti i passaggi nella pratica.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Collegamento di uno stream leggibile a uno stream scrivibile

Uno stream leggibile può essere inviato a uno stream scrivibile attraverso pipeTo(). ReadableStream.pipeTo() indirizza l'elemento ReadableStream corrente a un determinato WritableStream e restituisce un una promessa che si soddisfa quando il processo di piping viene completato correttamente o rifiuta se eventuali errori sono stati riscontrati.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Creazione di un flusso di trasformazione

L'interfaccia TransformStream dell'API Streams rappresenta un set di dati trasformabili. Tu crea un flusso di trasformazione chiamando il suo costruttore TransformStream(), che crea e restituisce un oggetto di flusso di trasformazione dai gestori specificati. Il costruttore TransformStream() accetta al suo primo argomento un oggetto JavaScript facoltativo che rappresenta transformer. Questi oggetti possono che contengono uno dei seguenti metodi:

transformer

  • start(controller): questo metodo viene chiamato immediatamente quando l'oggetto viene creato. Tipicamente questo viene utilizzato per accodare blocchi di prefissi, utilizzando controller.enqueue(). Questi blocchi verranno letti dal lato leggibile ma non dipendono da scritture sul lato scrivibile. Se questo numero è asincrono, ad esempio perché l'acquisizione dei blocchi di prefisso richiede uno sforzo, la funzione può restituire una promessa di segnalare l'esito positivo o negativo. una promessa rifiutata genera un errore flusso di dati. Eventuali eccezioni generate verranno nuovamente generate dal costruttore TransformStream().
  • transform(chunk, controller): questo metodo viene chiamato quando un nuovo blocco originariamente scritto nel in scrittura è pronta per essere trasformata. L'implementazione del flusso garantisce che questa funzione verrà chiamato solo dopo che le trasformazioni precedenti sono andate a buon fine e mai prima che start() completata o dopo la chiamata di flush(). Questa funzione esegue l'effettiva trasformazione del flusso di trasformazione. Può accodare i risultati utilizzando controller.enqueue(). Questo consente a un singolo blocco scritto sul lato scrivibile di produrre zero o più blocchi sul un lato leggibile, a seconda di quante volte viene chiamato controller.enqueue(). Se il processo di di trasformazione è asincrona, questa funzione può restituire una promessa per segnalare l'esito positivo o negativo del la trasformazione. Una promessa rifiutata genera un errore sia sul lato leggibile sia su quello scrivibile della e trasformare un flusso di dati. Se non viene fornito alcun metodo transform(), viene utilizzata la trasformazione dell'identità, che accoda blocchi senza modifiche dal lato scrivibile al lato leggibile.
  • flush(controller): questo metodo viene chiamato dopo che tutti i blocchi scritti sul lato scrivibile sono stati trasformata correttamente passando per transform() e il lato scrivibile sta per essere chiuso. In genere viene utilizzato per accodare blocchi di suffissi sul lato leggibile, anche prima di questo viene chiusa. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa a indicatori di esito positivo o negativo; il risultato verrà comunicato al chiamante di stream.writable.write(). Inoltre, una promessa rifiutata genera un errore sia nella risposta leggibile lati in scrittura dello stream. La generazione di un'eccezione viene considerata come la restituzione di un'eccezione la promessa.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie di accodamento writableStrategy e readableStrategy

Il secondo e il terzo parametro facoltativi del costruttore TransformStream() sono facoltativi Strategie di accodamento writableStrategy e readableStrategy. Sono definiti come descritto leggibile e in scrittura rispettivamente.

Esempio di codice di flusso per la trasformazione

Il seguente esempio di codice mostra un semplice flusso di trasformazione in azione.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Inserimento di un flusso leggibile attraverso un flusso di trasformazione

La pipeThrough() dell'interfaccia ReadableStream fornisce un modo concatenabile per collegare il flusso di corrente attraverso un flusso di trasformazione o qualsiasi altra coppia scrivibile/leggibile. In genere, la segnalazione di uno stream blocca per tutta la durata della pipeline, impedendo ad altri lettori di bloccarla.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Il prossimo esempio di codice (un po' inventato) mostra come implementare un "shouting" versione di fetch() che maiuscole tutto il testo utilizzando la promessa di risposta restituita come stream e l'utilizzo delle maiuscole in blocchi per blocco. Il vantaggio di questo approccio è che non occorre attendere l'intero documento da scaricare, il che può fare un'enorme differenza quando si gestiscono file di grandi dimensioni.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

La demo seguente mostra i flussi leggibili, scrivibili e trasformabili in azione. Include anche esempi di pipeThrough() e pipeTo() catene di tubi e dimostra anche tee(). Se vuoi, puoi eseguire alla demo nella sua finestra oppure consulta la codice sorgente.

Stream utili disponibili nel browser

Esistono diversi stream utili integrati direttamente nel browser. Puoi creare facilmente un ReadableStream da un blob. La Blob il metodo stream() dell'interfaccia restituisce un ReadableStream che, alla lettura, restituisce i dati contenuti nel blob. Ricorda inoltre che L'oggetto File è un tipo specifico di Blob e può essere utilizzato in qualsiasi contesto utilizzabile da un blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Le varianti di streaming di TextDecoder.decode() e TextEncoder.encode() vengono chiamate TextDecoderStream e TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

È facile comprimere o decomprimere un file CompressionStream e DecompressionStream trasforma i flussi rispettivamente. L'esempio di codice seguente mostra come scaricare la specifica Streams, comprimerla (gzip) nel browser e scrivi il file compresso direttamente sul disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

L'API File System Access dell'API FileSystemWritableFileStream: mentre gli stream di richiesta fetch() sperimentali sono esempi di flussi scrivibili in circolazione.

L'API Serial fa un uso intensivo degli stream leggibili e scrivibili.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Infine, l'API WebSocketStream integra gli stream con l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Risorse utili

Ringraziamenti

Questo articolo è stato esaminato da Jake Archibald François Beaufort Mario Rossi Mattias Buelens, Surma, Joe Medley e Andrea Rossi. I post del blog di Jake Archibald mi hanno aiutato molto a capire i flussi di dati. Alcuni esempi di codice sono ispirati dall'utente GitHub Le esplorazioni e le funzionalità di @bellbind parti della prosa si basano fortemente sul Documenti web MDN sugli stream. La Streams standard autori hanno fatto un enorme lavoro scrivere questa specifica. Immagine hero di Ryan Lara su Annulla schermata.