数据流 - 权威指南

了解如何使用 Streams API 使用可读写的数据流和转换数据流。

借助 Streams API,您可以以编程方式访问通过网络接收或通过本地任何方式创建的数据流,并使用 JavaScript 对其进行处理。流式传输涉及将您要接收、发送或转换的资源拆分成小块,然后逐字节处理这些块。虽然流式传输是浏览器在接收要在网页上展示的素材资源(如 HTML 或视频)时所做的工作,但在 2015 年引入带有信息流的 fetch 之前,此功能从未适用于 JavaScript。

以前,如果您想处理某种资源(例如视频、文本文件等),则必须下载整个文件,等待其反序列化为适当的格式,然后再进行处理。由于 JavaScript 可以使用数据流,因此这一切都将发生变化。现在,只要原始数据在客户端上可用,您就可以使用 JavaScript 逐步处理这些数据,而无需生成缓冲区、字符串或 blob。这为许多用例打开了大门,下面列出了其中的一些用例:

  • 视频特效:将可读取的视频流通过转换流传输,以便实时应用特效。
  • 数据(解)压缩:将文件流通过转换流管道传输,以选择性地对其进行(解)压缩。
  • 图片解码:将 HTTP 响应流通过一个转换流传输,该转换流会将字节解码为位图数据,然后再通过另一个转换流传输,该转换流会将位图转换为 PNG。如果安装在 Service Worker 的 fetch 处理程序内,您就可以透明地对 AVIF 等新图片格式进行 polyfill。

浏览器支持

ReadableStream 和 WritableStream

浏览器支持

  • Chrome:43.
  • Edge:14.
  • Firefox:65。
  • Safari:10.1。

来源

TransformStream

浏览器支持

  • Chrome:67.
  • Edge:79。
  • Firefox:102.
  • Safari:14.1。

来源

核心概念

在深入了解各种类型的直播之前,我先介绍一些核心概念。

分块

数据块是写入数据流或从数据流中读取的一段数据。它可以是任何类型;数据流甚至可以包含不同类型的数据块。在大多数情况下,对于给定数据流,分块并不是最原子的数据单元。例如,字节流可能包含由 16 KiB Uint8Array 单元(而非单个字节)组成的分块。

可读数据流

可读取数据流表示您可以从中读取数据的数据源。换句话说,数据来自可读取的流。具体而言,可读流是 ReadableStream 类的一个实例。

可写流

可写流表示可写入数据的目的地。换句话说,数据进入可写入的流中。具体而言,可写入流是 WritableStream 类的实例。

转换数据流

转换流由一对流组成:一个可写流(称为其可写端),一个可读流(称为其可读端)。一个现实世界的比喻是同声传译员,他们可以动态地从一种语言翻译成另一种语言。以转换流专有的方式,写入可写入端会导致新数据可从可读取端读取。具体而言,具有 writable 属性和 readable 属性的任何对象都可以用作转换流。不过,标准 TransformStream 类可以更轻松地创建正确纠缠的此类对。

管道链

流主要通过管道传输来使用。可读取的流可以使用可读取流的 pipeTo() 方法直接管道到可写入的流,也可以先使用可读取流的 pipeThrough() 方法通过一个或多个转换流管道。以这种方式将一组流管道化在一起的操作称为管道链。

背压

构建管道链后,它会传播有关数据块应以何种速度流经它的信号。如果链中的任何步骤尚无法接受分块,则会通过管道链向后传播信号,直到最终告知原始来源停止如此快速地生成分块。这个流程标准化过程称为背压。

开球

可以使用其 tee() 方法对可读流(以大写“T”的形状命名)进行标记。这会锁定数据流,即使其无法再直接使用;不过,它会创建两个新数据流(称为分支),这些分支可以单独使用。T 恤也很重要,因为直播无法后退或重新开始,我们稍后会对此进行详细介绍。

管道链的示意图,其中包含来自对提取 API 的调用的可读取数据流,该数据流随后通过转换数据流进行管道传输,其输出被分流,然后第一个生成的可读取数据流被发送到浏览器,第二个生成的可读取数据流被发送到服务工件缓存。
竖线链。

可读数据流的机制

可读数据流是以 JavaScript 表示的数据源,由从底层来源流出的 ReadableStream 对象表示。ReadableStream() 构造函数会根据给定的处理脚本创建并返回一个可读取的数据流对象。基础来源有两种类型:

  • 推送来源会在您访问它们时不断向您推送数据,是否启动、暂停或取消对数据流的访问由您决定。例如实时视频流、服务器发送的事件或 WebSocket。
  • 拉取来源要求您在其连接后明确请求其数据。示例包括通过 fetch()XMLHttpRequest 调用的 HTTP 操作。

流式数据以小块(称为分块)的形式顺序读取。放入数据流中的分块被称为加入队列。这意味着它们正在队列中等待读取。内部队列会跟踪尚未读取的块。

队列策略是一个对象,用于确定流应如何根据其内部队列的状态发出背压信号。队列策略会为每个分块分配大小,并将队列中所有分块的总大小与一个指定的数字(称为“高水位”)进行比较。

流中的分块由读取器读取。此读取器会一次检索一个数据块,以便您对其执行所需的任何操作。该读取器以及其附带的其他处理代码称为“使用者”。

在本上下文中,下一个构造体称为“控制器”。每个可读流都有一个关联的控制器,正如其名称所示,您可以通过该控制器控制流。

一次只能有一个读取器读取一个数据流;当读取器被创建并开始读取数据流(即成为活跃读取器)时,它会锁定到该数据流。如果您希望其他读取器接管读取数据流,通常需要先释放第一个读取器,然后才能执行任何其他操作(不过,您可以分流数据流)。

创建可读取的流

您可以通过调用其构造函数 ReadableStream() 来创建可读流。该构造函数有一个可选参数 underlyingSource,它表示一个具有方法和属性的对象,这些方法和属性定义了所构建的流实例的行为方式。

underlyingSource

这可以使用开发者定义的以下可选方法:

  • start(controller):在构建对象时立即调用。该方法可以访问数据流来源,并执行设置数据流功能所需的任何其他操作。如果此过程以异步方式完成,该方法可以返回一个 promise 来指示成功或失败。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController
  • pull(controller):可用于在提取更多分块时控制数据流。只要数据流的内部分块队列未满,系统就会重复调用该函数,直到队列达到其上限为止。如果调用 pull() 的结果是 promise,则在该 promise 执行完毕之前,系统不会再次调用 pull()。如果 promise 被拒绝,流将出错。
  • cancel(reason):在数据流使用方取消数据流时调用。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持以下方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 构造函数的第二个实参(同样是可选的)是 queuingStrategy。它是一个对象,可视需要定义数据流的排队策略,它接受以下两个参数:

  • highWaterMark:一个非负数,表示使用此队列策略的数据流的高水位。
  • size(chunk):用于计算并返回给定分块值的有限非负大小的函数。结果用于确定回压,并通过适当的 ReadableStreamDefaultController.desiredSize 属性进行显示。它还会控制何时调用底层源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如需从可读流读取数据,您需要一个读取器,即 ReadableStreamDefaultReaderReadableStream 接口的 getReader() 方法会创建一个读取器并将流锁定到该读取器。在流处于锁定状态时,除非此读取器被释放,否则无法获取其他读取器。

ReadableStreamDefaultReader 接口的 read() 方法返回一个 promise,可让您访问流的内部队列中的下一个分块。它会根据数据流的状态执行或拒绝操作,并返回结果。各种可能性如下所示:

  • 如果有可用分块,系统会使用形式为
    { value: chunk, done: false } 的对象来执行该 promise。
  • 如果数据流关闭,系统将执行 promise,并返回格式为
    { value: undefined, done: true } 的对象。
  • 如果流出错,promise 将被拒绝并返回相关错误。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

您可以通过访问可读取数据流的 ReadableStream.locked 属性来检查该数据流是否已锁定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读数据流代码示例

以下代码示例展示了所有步骤的操作过程。您首先创建一个 ReadableStream,并在其 underlyingSource 参数(即 TimestampSource 类)中定义一个 start() 方法。此方法会指示数据流的 controller 在 10 秒内每秒 enqueue() 一次时间戳。最后,它会指示控制器 close() 数据流。您可以通过 getReader() 方法创建读取器并调用 read() 直到流为 done 来使用此流。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

异步迭代

在每次 read() 循环迭代时检查流是否为 done 可能不是最方便的 API。幸运的是,我们很快就会推出一种更好的方法来实现此目的:异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前,使用异步迭代的一个解决方法是使用 polyfill 实现该行为。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

设置可读流

ReadableStream 接口的 tee() 方法会将当前可读流与当前可读流相关联,并返回包含两个结果分支的双元素数组作为新的 ReadableStream 实例。这样,两个读取器可以同时读取一个流。例如,如果您想从服务器提取响应并将其流式传输到浏览器,同时也将其流式传输到服务工件缓存,则可以在服务工件中执行此操作。由于响应正文不能多次使用,因此需要两个副本才能执行此操作。然后,如需取消该串流,您需要取消两个生成的分支。对数据流进行分流通常会在整个过程中锁定它,从而阻止其他读取器锁定它。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可读字节流

对于表示字节的流,系统提供了可读流的扩展版本,以便高效地处理字节,尤其是通过尽量减少复制来实现。字节流支持获取自带缓冲区 (BYOB) 读取器。默认实现可以提供一系列不同的输出,例如 WebSocket 情况下的字符串或数组缓冲区,而字节流可保证字节输出。此外,BYOB 读取器还具有稳定性优势。这是因为,如果缓冲区分离,则可以保证不会向同一缓冲区写入两次,从而避免竞争条件。BYOB 读取器可以减少浏览器需要运行垃圾回收的次数,因为它可以重复使用缓冲区。

创建可读取的字节流

您可以通过将额外的 type 参数传递给 ReadableStream() 构造函数来创建可读取的字节流。

new ReadableStream({ type: 'bytes' });

underlyingSource

可读字节流的底层来源被赋予 ReadableByteStreamController 以便操纵。其 ReadableByteStreamController.enqueue() 方法采用一个值为 ArrayBufferViewchunk 参数。属性 ReadableByteStreamController.byobRequest 会返回当前的 BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize 属性会返回所需大小,以填充受控数据流的内部队列。

queuingStrategy

ReadableStream() 构造函数的第二个实参(同样是可选的)是 queuingStrategy。它是一个对象,可选择为数据流定义一个队列策略,该策略接受一个参数:

  • highWaterMark:非负数字节数,表示使用此队列策略的流的高水位标记。这用于确定回压,通过相应的 ReadableByteStreamController.desiredSize 属性体现出来。它还会控制何时调用底层源的 pull() 方法。

getReader()read() 方法

然后,您可以通过相应地设置 mode 参数来访问 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" })。这样可以更精确地控制缓冲区分配,以避免复制。如需从字节流中读取,您需要调用 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可读字节流代码示例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

以下函数会返回可读取的字节流,以便高效地零拷贝读取随机生成的数组。它会尝试填充开发者提供的缓冲区,而不是使用预定的 1,024 块大小,从而实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写数据流的机制

可写入流是您可以将数据写入的目标,在 JavaScript 中由 WritableStream 对象表示。这相当于在底层接收器(写入原始数据的较低级别 I/O 接收器)之上的抽象。

数据会通过写入器一次写入一个数据块到数据流中。分块可以采用多种形式,就像阅读器中的分块一样。您可以使用任何代码来生成准备写入的数据块;写入器及其关联代码称为生产方

创建写入器并开始向流写入数据(活跃写入器)时,系统会说该写入器锁定到该流。一次只能有一个写入器写入可写入流。如果您希望另一个写入器开始向流写入,通常需要先释放它,然后再将另一个写入器附加到它。

内部队列会跟踪已写入到数据流但尚未由底层接收器处理的分块。

队列策略是一种对象,用于确定串流应如何根据其内部队列的状态发出回压信号。队列策略会为每个分块分配大小,并将队列中所有分块的总大小与一个指定的数字(称为“高水位”)进行比较。

最终的结构称为控制器。每个可写入流都有一个关联的控制器,可让您控制流(例如,中止流)。

创建可写入的流

Streams API 的 WritableStream 接口提供了一种标准抽象,用于将流式传输数据写入目标(称为接收器)。该对象内置了背压和队列功能。您可以通过调用其构造函数 WritableStream() 来创建可写入流。它有一个可选的 underlyingSink 参数,该参数表示一个对象,其中包含用于定义构建的串流实例行为的方法和属性。

underlyingSink

underlyingSink 可以包含以下由开发者定义的可选方法。传递给某些方法的 controller 参数是 WritableStreamDefaultController

  • start(controller):在构建对象时,系统会立即调用此方法。此方法的内容应旨在获得对底层接收器的访问权限。如果此过程要异步完成,则可以返回一个 promise 来指示成功或失败。
  • write(chunk, controller):当有新的数据块(在 chunk 参数中指定)准备好写入底层接收器时,系统会调用此方法。它可以返回一个 promise 来指示写入操作是否成功。只有在之前的写入成功后,才会调用此方法,绝不会在流关闭或被中止后调用。
  • close(controller):如果应用发出信号表示已将分块写入到数据流,系统会调用此方法。这些内容应执行任何必要的操作,才能完成对底层接收器的写入,并释放对底层接收器的访问权限。如果此过程是异步的,则可以返回一个 promise 来指示成功或失败。只有在所有已加入队列的写入都成功后,才会调用此方法。
  • abort(reason):如果应用发出信号表示希望突然关闭数据流并将其置于错误状态,系统会调用此方法。它可以清理任何保留的资源(与 close() 类似),但即使写入操作排队,系统也会调用 abort()。这些分块将被舍弃。如果此过程是异步的,则可以返回一个 promise 来指示成功或失败。reason 参数包含一个 DOMString,用于说明流式传输被中止的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 接口代表一个控制器,用于在设置期间(随着更多分块提交写入或写入结束时)控制 WritableStream 的状态。构建 WritableStream 时,底层接收器会获得一个相应的 WritableStreamDefaultController 实例进行操作。WritableStreamDefaultController 只有一个方法:WritableStreamDefaultController.error(),这会导致日后与关联数据流的任何互动都出错。WritableStreamDefaultController 还支持 signal 属性,该属性会返回 AbortSignal 的实例,以便在需要时停止 WritableStream 操作。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 构造函数的第二个参数(同样可选)是 queuingStrategy。它是一个对象,可选择为数据流定义一个队列策略,该策略接受两个参数:

  • highWaterMark:一个非负数,表示使用此队列策略的数据流的高水位。
  • size(chunk):用于计算并返回给定分块值的有限非负大小的函数。结果用于确定回压,并通过相应的 WritableStreamDefaultWriter.desiredSize 属性体现出来。

getWriter()write() 方法

如需向可写入的流写入数据,您需要使用写入器,即 WritableStreamDefaultWriterWritableStream 接口的 getWriter() 方法会返回一个新的 WritableStreamDefaultWriter 实例,并将流锁定到该实例。在流处于锁定状态时,除非当前写入器被释放,否则无法获取任何其他写入器。

WritableStreamDefaultWriter 接口的 write() 方法会将传递的部分数据写入 WritableStream 及其底层接收器,然后返回一个 promise,该 promise 的解析结果用于指示写入操作是成功还是失败。请注意,“成功”的含义取决于底层接收器;它可能表示分块已被接受,但不一定表示分块已安全保存到最终目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

您可以通过访问其 WritableStream.locked 属性来检查可写流是否处于锁定状态。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写数据流代码示例

以下代码示例展示了所有步骤的操作过程。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

将可读流传输到可写流

可读数据流可以通过可读数据流的 pipeTo() 方法管道传输到可写数据流。ReadableStream.pipeTo() 会将当前 ReadableStream 管道传输到给定的 WritableStream,并返回一个 promise,该 promise 会在管道传输过程成功完成时执行,如果遇到任何错误,则会被拒绝。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建转换流

Streams API 的 TransformStream 接口表示一组可转换的数据。您可以通过调用其构造函数 TransformStream() 来创建转换流,该构造函数会根据给定处理程序创建并返回转换流对象。TransformStream() 构造函数接受一个表示 transformer 的可选 JavaScript 对象作为其第一个参数。此类对象可以包含以下任一方法:

transformer

  • start(controller):在构建对象时,系统会立即调用此方法。通常,此方法用于使用 controller.enqueue() 将前缀分块加入队列。这些区块将从可读端读取,但不依赖于对可写端执行任何写入操作。如果此初始过程是异步的(例如,由于需要花费一些精力才能获取前缀分块),则该函数可以返回一个 promise 来指示成功或失败;被拒绝的 promise 会导致数据流错误。TransformStream() 构造函数将重新抛出任何抛出的异常。
  • transform(chunk, controller):当最初写入可写入端的新分块准备好进行转换时,系统会调用此方法。流实现可保证仅在之前的转换成功后调用此函数,且绝不会在 start() 完成之前或调用 flush() 之后调用。此函数会执行转换流的实际转换工作。它可以使用 controller.enqueue() 将结果加入队列。这样一来,写入可写入端的单个分块可能会在可读取端产生零个或多个分块,具体取决于 controller.enqueue() 的调用次数。如果转换过程是异步的,此函数可以返回一个 promise,以指示转换是否成功。被拒绝的 promise 会导致转换流的可读和可写侧都出错。如果未提供任何 transform() 方法,则系统会使用身份转换,该转换会将写入端的块按原样加入可读端的队列。
  • flush(controller):在成功通过 transform() 转换写入到可写入端的所有分块后,系统会调用此方法,并且可写入端即将关闭。通常,这用于在可读取端关闭之前,将后缀分块加入队列。如果清空进程是异步的,函数可以返回一个 promise 来指示成功或失败;结果会传达给 stream.writable.write() 的调用方。此外,被拒绝的 promise 会导致流的可读和可写侧都出错。抛出异常与返回被拒绝的 promise 相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 队列策略

TransformStream() 构造函数的第二个和第三个可选参数是可选的 writableStrategyreadableStrategy 队列策略。它们的定义分别在可读可写数据流部分中加以说明。

转换流代码示例

以下代码示例展示了简单的转换流程。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

通过转换流管道传输可读数据流

ReadableStream 接口的 pipeThrough() 方法提供了一种可链式方式,通过转换流或任何其他可写/可读对来连接当前流。管道化流通常会在管道化期间锁定它,以防止其他读取器锁定它。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

以下代码示例(有点牵强附会)展示了如何实现 fetch() 的“大声说话”版本,该版本会通过作为流使用返回的响应 promise 并逐分转换为大写形式来将所有文本转换为大写形式。这种方法的优势在于,您无需等待下载整个文档,这在处理大型文件时会产生很大影响。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

演示

以下演示展示了可读、可写和转换流的实际运用。其中还包含 pipeThrough()pipeTo() 管道链的示例,以及 tee() 的演示。您可以选择在自己的窗口中运行演示,也可以查看源代码

浏览器中提供的实用数据流

浏览器中内置了许多实用的串流。您可以从 blob 轻松创建 ReadableStreamBlob 接口的 stream() 方法会返回一个 ReadableStream,该 ReadableStream 在读取时会返回 blob 中包含的数据。另请注意,File 对象是一种特定的 Blob,可在 Blob 可用的任何上下文中使用。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的流式变体分别称为 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

您可以分别使用 CompressionStreamDecompressionStream 转换流轻松压缩或解压缩文件。以下代码示例展示了如何下载 Streams 规范、直接在浏览器中对其进行压缩(gzip),以及将压缩文件直接写入磁盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和实验性 fetch() 请求流是现成的可写流示例。

Serial API 大量使用可读和可写流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,WebSocketStream API 将数据流与 WebSocket API 集成。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

实用资源

致谢

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 审核。 Jake Archibald 的博文对我了解数据流非常有帮助。部分代码示例的灵感来自 GitHub 用户 @bellbind 的探索,部分文字内容则大量借鉴了 MDN Web 文档中的“流”部分Streams Standard作者在编写此规范方面做了很大的贡献。主图片由 Ryan Lara 发布在 Unsplash 上。