訊息串:這份指南

瞭解如何透過 Streams API 使用可讀取、可寫入及轉換的串流。

Streams API 可讓您透過程式存取透過網路接收的資料串流 或是運用於本機連線 以 JavaScript 處理這些資料串流是指細分您要接收、傳送或轉換的資源 分批處理,然後逐一處理這些區塊直播是很重要的 瀏覽器在收到 HTML 或影片等素材資源,才會顯示在網頁時 功能,在 2015 年推出串流的 fetch 之前,就從未向 JavaScript 提供。

過去,如果您想處理某種資源 (例如影片或文字檔等) 您必須下載整個檔案,等待檔案反序列化為合適的格式 然後處理支援串流服務的裝置 這就是所有的變更您現在可以使用 JavaScript 逐步處理原始資料,例如: 一旦該資料可供用戶端使用,不需要產生緩衝區、字串或 blob。 此舉可支援多種用途,部分範例如下:

  • 影片效果:透過套用效果的轉換串流填入可讀取的影片串流 即時監控業務成效
  • 資料 (解壓縮) 壓縮:透過可選擇性地選取的轉換串流來插入檔案串流 (de) 會將其壓縮。
  • 圖片解碼:透過可解碼位元組的轉換串流,插入 HTTP 回應串流 轉換為點陣圖資料,然後經由另一個轉換串流來將點陣圖轉換為 PNG。如果 安裝在 Service Worker 的 fetch 處理常式中,這可讓您以透明化的方式 polyfill 像是 AVIF 等新的圖片格式

瀏覽器支援

ReadableStream 和 WritableStream

瀏覽器支援

  • Chrome:43.
  • Edge:14。
  • Firefox:65。
  • Safari:10.1.

資料來源

TransformStream

瀏覽器支援

  • Chrome:67.
  • Edge:79,
  • Firefox:102。
  • Safari:14.1。

資料來源

核心概念

在詳細說明各種直播類型前,我先來介紹一些核心概念。

塊狀

區塊是寫入串流或從串流讀取的單一資料。可以是任何 類型;甚至可以包含不同類型的區塊大多數時候,分段並非最完整 特定串流的資料單位。例如,位元組串流可能包含由 16 個 KiB Uint8Array 單位,而非單一位元組。

可讀取串流

可讀取的串流代表您可以讀取的資料來源。換句話說,資料 請參閱可讀取的串流。確切來說,可讀取的串流是 ReadableStream 的執行個體 類別

可寫入串流

可寫入的串流代表您可以寫入資料的目的地。也就是「資料」 會進入可寫入的串流。具體來說,可寫入的串流是 WritableStream 類別。

轉換串流

轉換串流由「兩組串流」組成,也就是可寫入的串流 (又稱為可寫入的端)。 以及可讀取的串流資料 (也就是可讀取的端)。 現實世界的隱喻 同步翻譯 即時將某種語言翻譯成另一種語言 以轉換串流的特定方式編寫 則會產生新資料,並從 確切來說,任何具有 writable 屬性和 readable 屬性的物件都能提供 做為轉換串流不過,標準的 TransformStream 類別更容易建立 緊密貼合

管線鏈

串流主要用於「連接」串流。可直接連接可讀取的串流 使用可讀取的串流的 pipeTo() 方法,或透過單一管道將資料傳入可寫入的串流 或更多轉換串流 (使用可讀取串流的 pipeThrough() 方法)。一組 以這種方式連接的串流稱為管道鏈。

背壓

管道鏈建構完成後,就會傳播有關區塊應流速的信號 移除。如果鏈結中的任一步驟目前無法接受區塊,就會向後傳播信號 ,直到系統指示原始來源停止產生區塊時,才會停止產生區塊, 快速這個正規化流程的程序稱為背壓。

開球

您可以使用其 tee() 方法,為可讀取的串流命名 (以大寫「T」形狀命名)。 這樣做會「鎖定」串流,也就是說,無法再直接使用串流。但會產生兩個新的 串流,稱為分支版本,可獨立使用。 此外,串流內容也很重要,因為串流無法重複或重新啟動,稍後會有更多說明。

管道鏈的圖表,由對擷取 API 的呼叫所構成的可讀串流所組成,並透過轉換串流管道管道,該串流的輸出內容經過加密,然後將輸出傳送至瀏覽器,用於第一個產生的可讀取串流,以及為第二個產生的可讀取串流傳送至 Service Worker 的快取。
管道鏈。

可讀取串流的機制

可讀取的串流是由 JavaScript 形式呈現的資料來源, ReadableStream 物件,該物件會 流量 ReadableStream()敬上 建構函式會從指定的處理常式建立並傳回可讀取的串流物件。這裡共有兩個 基礎來源的類型:

  • 推送來源:在您存取這些資料時不斷推送資料,由您全權決定 開始、暫停或取消存取直播。例如即時影片串流、伺服器傳送的事件、 或 WebSockets
  • 提取來源會要求您在連線後明確要求資料。範例 透過 fetch()XMLHttpRequest 呼叫,包括 HTTP 作業。

串流資料會依循序讀取,且稱為區塊。 系統會將串流中的區塊視為排入佇列。這表示等候佇列中 隨時都能閱讀內部佇列會追蹤尚未讀取的區塊。

排入佇列策略是一種物件,用於決定串流應如何根據 那就是它內部佇列的狀態排入佇列策略會為每個區塊指派大小,然後比較 佇列中所有區塊的總大小至指定數字,稱為「高浮水印」

串流中的區塊會由「讀取器」讀取。此讀取器會以 讓您隨心所欲執行各種作業「讀者」加上 隨附的處理程式碼稱為「用戶」

此情境中的下一個結構稱為「控制器」。每個可讀取的串流都有相關聯的 顧名思義,這個控制器能讓您控制串流。

一次只能有一位讀取者閱讀串流內容;讀取器建立並開始讀取串流時 (也就是說,已成為「活躍讀者」),已「鎖定」。如果希望其他讀者 讀取串流時,您通常需要先釋出第一位讀者,再執行其他操作 (但你可以建立串流)。

建立可讀取的串流

呼叫其建構函式即可建立可讀取的串流 ReadableStream()。 建構函式包含代表物件的選用引數 underlyingSource 透過方法和屬性定義建構的串流執行個體行為。

underlyingSource

這種方法可以使用以下由開發人員定義的選用方法:

  • start(controller):在建構物件時立即呼叫。 方法可存取串流來源,並執行其他操作 才能設定串流功能。如果這個程序是非同步執行,則此方法 傳回承諾,表示成功或失敗。傳遞至這個方法的 controller 參數是 換 ReadableStreamDefaultController
  • pull(controller):可用於控制串流,因為擷取更多區塊。這項服務 系統會重複呼叫,前提是串流的內部佇列未滿格,直到佇列為止 達到最高水分如果呼叫 pull() 的結果是承諾, 除非承諾履行,否則系統不會再次呼叫 pull()。 如果承諾拒絕,串流就會發生錯誤。
  • cancel(reason):在串流消費者取消串流時呼叫。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支援下列方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

其次,ReadableStream() 建構函式的引數 (選用) 是 queuingStrategy。 這個物件可選擇性定義串流的排入佇列策略,且可採用 參數:

  • highWaterMark:非負數的數字,表示使用這個排入佇列的串流高度標示高度。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負數大小。 結果將用於判斷背壓,並透過適當的 ReadableStreamDefaultController.desiredSize 屬性顯示。 也會管理呼叫基礎來源的 pull() 方法的時間。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);
敬上

getReader()read() 方法

需要讀取器 (也就是 ReadableStreamDefaultReaderReadableStream 介面的 getReader() 方法會建立讀取器,並將串流鎖定到 基礎架構串流鎖定後,必須等到這個串流發布後,才能收購其他讀者。

read() ReadableStreamDefaultReader 介面的方法會傳回承諾,提供下一個存取 重複執行這個區塊系統在執行或拒絕時,會依據其狀態 再透過訊息串可能的比率如下:

  • 如果有區塊,則會使用形式的物件來完成承諾。
    { value: chunk, done: false }
  • 串流關閉後,系統會使用以下形式的物件完成承諾。
    { value: undefined, done: true }
  • 如果串流發生錯誤,保證會遭到拒絕,並顯示相關錯誤。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 屬性

如要存取可讀取的直播影片,您可以查看該串流是否遭到鎖定。 ReadableStream.locked敬上 資源。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可讀取的串流程式碼範例

以下程式碼範例顯示了所有操作步驟。您先在其資源中建立 ReadableStream underlyingSource 引數 (也就是 TimestampSource 類別) 定義 start() 方法。 這個方法會將串流的 controller 告知 在十秒中每秒enqueue()輸入時間戳記。 最後,這個元件會指示控制器 close() 串流。在這個過程中 透過 getReader() 方法建立讀取器,並呼叫 read() 直到串流結束,以建立串流 done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同步疊代

如果串流不是 done 最方便的 API,則檢查每個 read() 迴圈疊代。 幸好在不久的將是更好的做法:非同步疊代。

for await (const chunk of stream) {
  console.log(chunk);
}
敬上

使用非同步疊代的解決方法,是先使用 polyfill 實作行為。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

為可讀取的串流內容開球

通知的 tee() 方法 ReadableStream 介面會擷取目前的可讀取串流,並傳回兩個元素陣列 包含兩個產生的分支版本,做為新的 ReadableStream 執行個體。這樣一來, 就是可以同時閱讀串流內容舉例來說,您可以在 Service Worker 中執行 您要從伺服器擷取回應並串流到瀏覽器 Service Worker 快取。由於回應主體不能重複使用,因此您需要兩個副本 執行這個步驟如要取消串流,請取消兩個產生的分支版本。建立直播視窗 通常會鎖定 Wi-Fi,防止其他讀者鎖定。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可讀取的位元組串流

針對表示位元組的串流,系統會提供可讀取串流的擴充版本來處理 尤其是將副本數減至最少位元組串流支援自行緩衝區緩衝區 要收購的 (BYOB) 讀者。預設的實作方式可以提供一系列不同的輸出內容,例如 視為字串或陣列緩衝區 (如果是 WebSocket),而位元組串流則保證位元組輸出。 此外,BYOB 讀取器享有穩定性優勢。這是 因為如果緩衝區卸離,就能保證緩衝區不會寫入同一緩衝區兩次 以免發生競爭狀況BYOB 讀取器可減少瀏覽器需要執行的次數 垃圾收集,因為它可重複使用緩衝區。

建立可讀取的位元組資料流

只要將額外的 type 參數傳遞至 ReadableStream() 建構函式。

new ReadableStream({ type: 'bytes' });

underlyingSource

可讀取位元組資料流的基礎來源會給予 ReadableByteStreamController 到 容易操縱。其 ReadableByteStreamController.enqueue() 方法會使用 chunk 引數,其中的值為 為 ArrayBufferViewReadableByteStreamController.byobRequest 屬性會傳回目前 BYOB 提取要求,如果沒有,則傳回空值。最後,ReadableByteStreamController.desiredSize 屬性會傳回所需大小,以填滿受控制串流的內部佇列。

queuingStrategy

其次,ReadableStream() 建構函式的引數 (選用) 是 queuingStrategy。 物件是一種可選擇性定義串流的排入佇列策略的物件, 參數:

  • highWaterMark:非負數的位元組數量,表示使用這個排入佇列策略的串流高浮水印。 可用來判斷背壓,並透過適當的 ReadableByteStreamController.desiredSize 屬性顯示。 也會管理呼叫基礎來源的 pull() 方法的時間。
,瞭解如何調查及移除這項存取權。 ,瞭解如何調查及移除這項存取權。

getReader()read() 方法

然後,您可以據此設定 mode 參數,以便存取 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" })。這樣就能更精確地控制緩衝區 才能避免複製如要讀取位元組資料流,您必須呼叫 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可讀取位元組串流程式碼範例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

下列函式會傳回可讀取的位元組串流,以便有效率地零複製 隨機產生的陣列與其使用預先定義的 1,024 區塊大小,而是試著填入 開發人員提供的緩衝區,以便完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可寫入資料流的機制

可寫入的串流是讓您寫入資料的目的地,以 JavaScript 表示 WritableStream 物件。這個 可做為基礎接收器頂端的抽象層,也就是具備較低層級的 I/O 接收器。 會寫入原始資料

資料會透過「寫入者」寫入串流,一次一個區塊。區塊可以 多種形式,就像閱讀器中的區塊一樣您可以使用任何要生成的程式碼 可編寫的區塊寫入者加上相關程式碼,稱為「生產端」

寫手建立並開始撰寫串流內容 (屬於活躍作曲者) 時,就算是 已鎖定。一次只能將一位寫入者寫入可寫入的串流。如果想 開始撰寫直播內容之前,你通常需要先釋出直播,然後再附加 再加入一位寫入者

內部佇列會追蹤寫入串流但尚未寫入的區塊 由基礎接收器處理

排入佇列策略是一種物件,用於決定串流應如何根據 那就是它內部佇列的狀態排入佇列策略會為每個區塊指派大小,然後比較 佇列中所有區塊的總大小至指定數字,稱為「高浮水印」

最終的建構稱為「控制器」。每個可寫入串流都有相關聯的控制器 控制串流 (例如取消串流)。

建立可寫入的串流

WritableStream 介面 Streams API 提供將串流資料寫入目的地的標準抽象化機制 做為接收器此物件內建背壓和排入佇列。建立可寫入的訊息串時 呼叫其建構函式 WritableStream()。 其中包含代表物件的選用 underlyingSink 參數 透過方法和屬性定義建構的串流執行個體行為。

underlyingSink

underlyingSink 可能包含下列開發人員定義的選用方法。controller 傳送給某些方法的 WritableStreamDefaultController

  • start(controller):在建構物件時會立即呼叫此方法。 就應使用此方法的內容來存取基礎接收器。如果這項程序 而可能會傳回承諾,表示成功或失敗。
  • write(chunk, controller):系統會在新的資料區塊 ( chunk 參數),現在已準備好寫入基礎接收器。能傳回承諾 表示寫入作業成功或失敗系統只會在前一個步驟後呼叫這個方法 寫入成功,在串流關閉或取消後也絕對不會。
  • close(controller):如果應用程式表示其已完成寫入,系統就會呼叫此方法 多個片段內容應該會進行任何必要的操作,以完成寫入 並釋出該接收器的存取權如果這個程序是非同步的,則可以傳回 表示失敗或失敗只有在所有已排入佇列的寫入作業完成後,才會呼叫這個方法 更新成功。
  • abort(reason):如果應用程式表示希望突然關閉,系統會呼叫這個方法 然後將其設為錯誤狀態可用來清除已留存的資源 close(),但即使寫入已排入佇列,系統仍會呼叫 abort()。系統會擲回這些區塊 因為如果是非同步的程序,可能會傳回承諾,表示成功或失敗。 reason 參數包含 DOMString,用於說明串流取消原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

WritableStreamDefaultController敬上 Streams API 的介面代表一個控制器,可控制 WritableStream 的狀態 傳送更多區塊,進行編寫或寫入完成。建構期間 WritableStream,基礎接收器會獲得對應的 WritableStreamDefaultController 要操控的執行個體WritableStreamDefaultController 只有一個方法: WritableStreamDefaultController.error(), 這樣日後與相關聯串流之間的互動就會發生錯誤。 WritableStreamDefaultController 也支援 signal 屬性,該屬性會傳回 AbortSignal、 允許視需要停止 WritableStream 作業。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

其次,WritableStream() 建構函式的引數 (選用) 是 queuingStrategy。 這個物件可選擇性定義串流的排入佇列策略,且可採用 參數:

  • highWaterMark:非負數的數字,表示使用這個排入佇列的串流高度標示高度。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負數大小。 結果將用於判斷背壓,並透過適當的 WritableStreamDefaultWriter.desiredSize 屬性顯示。
,瞭解如何調查及移除這項存取權。

getWriter()write() 方法

如要寫入可寫入的串流,需要寫入者,也就是 WritableStreamDefaultWriterWritableStream 介面的 getWriter() 方法會傳回 WritableStreamDefaultWriter 的新執行個體,並將串流鎖定至該執行個體。雖然 直播影片已鎖定。在目前的播放之前,您無法取得其他作家。

write() 方法 WritableStreamDefaultWriter。 介面會將傳遞的資料區塊寫入 WritableStream 及其基礎接收器,然後傳回 並承諾表示寫入作業成功或失敗。請注意 「成功」代表取決於基礎接收器可能代表該區塊已接受 但不一定能安全儲存至終極目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 屬性

您可以存取可寫入的串流,查看該串流是否遭到鎖定 WritableStream.locked敬上 資源。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可寫入串流程式碼範例

以下程式碼範例顯示了所有操作步驟。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

將可讀取的串流傳送至可寫入的串流

可讀取的串流可以透過可讀取的串流,傳遞至可寫入的串流 pipeTo() 方法,增加圍繞地圖邊緣的邊框間距。 ReadableStream.pipeTo() 會將目前的 ReadableStream 分隔成指定的 WritableStream 並傳回 保證在管道成功完成時能完成,或者如果發生任何錯誤,就會拒絕 。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

建立轉換串流

Streams API 的 TransformStream 介面代表一組可轉換的資料。個人中心 呼叫其建構函式 TransformStream() 來建立轉換串流,此函式會建立並傳回 來自特定處理常式的轉換串流物件。TransformStream() 建構函式接受為 其第一個引數是代表 transformer 的選用 JavaScript 物件。這類物件 包含下列方法:

transformer

  • start(controller):在建構物件時會立即呼叫此方法。一般價格 用途是透過 controller.enqueue() 將前置字串區塊排入佇列。系統會讀取這些區塊 而非寫入可寫入端的任何寫入。如果這個初始值 由於需要花些時間才能取得前置字串區塊 函式可傳回承諾,表示成功或失敗;遭拒的承諾會導致系統發生錯誤 串流。任何擲回的例外狀況都將由 TransformStream() 建構函式重新擲回。
  • transform(chunk, controller):當一開始將新區塊寫入 可寫入的一端已準備好進行轉換串流實作可保證這個函式 只有在先前的轉換成功之後,才會呼叫,而在 start() 之前 已完成或呼叫 flush()。這個函式會執行實際的轉換 轉換串流的結果它可以使用 controller.enqueue() 將結果排入佇列。這個 允許寫入可寫入端的單一區塊,產生零或多個區塊 易讀性質,取決於呼叫 controller.enqueue() 的次數。如果程序 非同步模式;這個函式可能會傳回承諾,以表示 工作。遭拒的承諾會導致廣告活動可讀取及寫入的那一端發生錯誤 轉換串流。如未提供 transform() 方法,則會使用 Identity 轉換, 將變更從可寫入端變更至可讀取端的區塊排入佇列。
  • flush(controller):將所有寫入可寫入端的區塊寫入完後,就會呼叫這個方法 只要成功傳遞 transform(),即可進行轉換,而可寫入端正好是 已打烊。通常用於在可讀取的端將後置字串區塊排入佇列中。 已關閉。如果清除程序為非同步性質,函式可能會向 訊號是否良好或失敗;我們會將結果傳達給 stream.writable.write()。此外,遭拒的承諾會導致可讀取且 就能在串流中寫入資料擲回例外狀況等同於傳回遭拒 承諾。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 佇列策略

TransformStream() 建構函式的第二個和第三個選用參數為選用參數 writableStrategyreadableStrategy 佇列策略。這些 API 的定義如下: 可讀取可寫入的串流 區段

轉換串流程式碼範例

下列程式碼範例顯示了簡易轉換串流的實際運作情形。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

透過轉換串流對可讀取的串流進行連線偵測 (ping)

pipeThrough() ReadableStream 介面的方法提供了可鏈結目前的串流管道 讀取及寫入資料。盜轉串流通常會鎖定 避免其他讀者鎖定

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一個程式碼範例 (稍做補充) 顯示如何實作「公開」「fetch()」版本 運用傳回的回應承諾,將所有文字大寫 以串流方式 還是要分段進行大寫這個做法的優點是您不必等待 與下載整份文件相比,這在處理大型檔案時有很大的差異。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

示範

以下示範說明可讀取、可寫入和轉換串流的應用實例。此版本也包含範例 的 pipeThrough()pipeTo() 管道鏈,並示範 tee()。您可以選擇在 示範本身的視窗或頁面 原始碼

瀏覽器提供的實用串流內容

瀏覽器內建許多實用的串流資料,您可以輕鬆建立 來自 blob 的 ReadableStreamBlob 介面的 stream() 方法會傳回 ReadableStream 會在讀取時傳回包含 blob 中的資料。另請注意, File 物件是特定種類的 Blob,而且可用於 blob 適用的任何結構定義。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

系統會呼叫 TextDecoder.decode()TextEncoder.encode() 的串流變化版本 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

使用 CompressionStreamDecompressionStream 轉換串流 。下列程式碼範例顯示如何下載串流規格,並將其壓縮 (gzip) ,然後將壓縮檔直接寫入磁碟。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API FileSystemWritableFileStream敬上 實驗性的 fetch() 要求串流 可寫入的串流範例

Serial API 會大量使用可讀取和可寫入的串流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後,WebSocketStream API 會將串流與 WebSocket API 整合。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

實用資源

特別銘謝

本文評論者: Jake Archibald François BeaufortSam Dutton Mattias BuelensSurmaJoe MedleyAdam RiceJake Archibald 的網誌文章讓我獲益良多 串流。部分程式碼範例來自 GitHub 使用者 @bellbind 的探索和 大量的義詞 串流的 MDN 網頁版文件串流標準 作者 進行了非常多職 以便撰寫這個規格主頁橫幅由 Ryan Lara 提供 停用