数据流 - 权威指南

了解如何使用 Streams API 使用可读取、可写入和可转换的流。

借助 Streams API,您可以以编程方式访问通过网络接收或通过任何方式在本地创建的数据流,并使用 JavaScript 对其进行处理。流式传输涉及将您要接收、发送或转换的资源拆分成小块,然后逐字节处理这些块。虽然浏览器在接收要显示在网页上的 HTML 或视频等资源时会进行流式传输,但在 2015 年引入带有串流的 fetch 之前,JavaScript 从未具备此功能。

以前,如果您想处理某种资源(例如视频、文本文件等),则必须下载整个文件,等待其反序列化为适当的格式,然后再进行处理。由于 JavaScript 可以使用数据流,因此这一切都将发生变化。现在,您可以在客户端上有原始数据可用时立即使用 JavaScript 逐步处理这些数据,而无需生成缓冲区、字符串或 Blob。这为许多用例打开了大门,下面列出了其中的一些:

  • 视频特效:将可读取的视频流通过转换流传输,以便实时应用特效。
  • 数据 (解)压缩:将文件流通过转换流管道传输,以选择性地对其进行(解)压缩。
  • 图片解码:将 HTTP 响应流通过一个转换流传输,该转换流会将字节解码为位图数据,然后再通过另一个转换流传输,该转换流会将位图转换为 PNG。如果安装在 Service Worker 的 fetch 处理程序内,您就可以透明地对 AVIF 等新图片格式进行 polyfill。

浏览器支持

ReadableStream 和 WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

核心概念

在详细介绍各种类型的数据流之前,我先介绍一些核心概念。

数据块

数据块是写入数据流或从数据流中读取的一段数据。它可以是任何类型;数据流甚至可以包含不同类型的数据块。在大多数情况下,数据块不会是给定数据流的最原子数据单元。例如,字节流可能包含由 16 KiB Uint8Array 单元(而非单个字节)组成的分块。

可读取的数据流

可读取数据流表示您可以从中读取数据的数据源。换句话说,数据来自可读取的流。具体而言,可读取的流是 ReadableStream 类的实例。

可写数据流

可写入流代表可写入数据的目标位置。换句话说,数据会进入可写入的流。具体而言,可写入流是 WritableStream 类的实例。

转换数据流

转换流由一对流组成:一个可写流(称为其可写端)和一个可读流(称为其可读端)。一个现实世界的比喻是同声传译员,他们可以动态地从一种语言翻译成另一种语言。以转换流专有的方式,写入可写入端会导致新数据可从可读取端读取。具体而言,具有 writable 属性和 readable 属性的任何对象都可以用作转换流。不过,标准 TransformStream 类可以更轻松地创建这样一对正确纠缠的 qubit。

管链

数据流主要用于将它们管道化到彼此。可读流可以使用可读流的 pipeTo() 方法直接管道到可写流,也可以先使用可读流的 pipeThrough() 方法通过一个或多个转换流管道。以这种方式管道化在一起的一组数据流称为管道链。

背压

管道链构建完毕后,它会传播有关数据块应以何种速度流经它的信号。如果链中的任何步骤尚无法接受分块,则会通过管道链向后传播信号,直到最终告知原始来源停止如此快速地生成分块。这种规范化流量的过程称为回压。

开球

可使用可读取流的 tee() 方法对其进行分流(以大写字母“T”的形状命名)。这会锁定数据流,即使其无法再直接使用;不过,它会创建两个新数据流(称为分支),这些分支可以单独使用。创建分支也非常重要,因为流式传输无法回放或重启,我们稍后会详细介绍这一点。

管道链图,其中包含来自对提取 API 的调用的可读取流,该流会通过转换流管道传输,其输出会分流,然后第一个生成的可读取流会发送到浏览器,第二个生成的可读取流会发送到服务工件缓存。
管道链。

可读数据流的机制

可读取的数据流是指由从底层来源流出的 ReadableStream 对象在 JavaScript 中表示的数据源。ReadableStream() 构造函数会根据给定的处理脚本创建并返回一个可读取的数据流对象。底层来源有两种类型:

  • 推送来源会在您访问它们时不断向您推送数据,您可以自行开始、暂停或取消对数据流的访问。例如实时视频流、服务器发送的事件或 WebSocket。
  • 连接到拉取来源后,您需要明确请求从中获取数据。示例包括通过 fetch()XMLHttpRequest 调用的 HTTP 操作。

流式数据以称为数据块的小块顺序读取。放入数据流中的分块被称为加入队列。这意味着它们正在队列中等待读取。内部队列会跟踪尚未读取的块。

队列策略是一种对象,用于确定串流应如何根据其内部队列的状态发出回压信号。队列策略会为每个分块分配大小,并将队列中所有分块的总大小与一个指定的数字(称为“高水位”)进行比较。

读取器会读取数据流中的分块。此读取器会一次检索一个数据块,以便您对其执行任何类型的操作。读取器及其随附的其他处理代码称为使用方

在本上下文中,下一个构造体称为控制器。每个可读流都有一个关联的控制器,正如其名称所示,您可以通过该控制器控制流。

一次只能有一个读取器读取一个数据流;当读取器被创建并开始读取数据流(即成为活跃读取器)时,它会锁定到该数据流。如果您希望另一个读取器接管读取数据流,通常需要先释放第一个读取器,然后才能执行任何其他操作(不过,您可以分流数据流)。

创建可读取的流

您可以通过调用其构造函数 ReadableStream() 来创建可读取的流。该构造函数有一个可选参数 underlyingSource,它表示一个具有方法和属性的对象,这些方法和属性定义了构建的串流实例的行为方式。

underlyingSource

这可以使用以下可选的开发者定义的方法:

  • start(controller):在构建对象时立即调用。该方法可以访问数据流来源,并执行设置数据流功能所需的任何其他操作。如果此过程要异步完成,该方法可以返回一个 promise 来指示成功或失败。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController
  • pull(controller):可用于在提取更多分块时控制数据流。只要数据流的内部分块队列未满,系统就会重复调用该函数,直到队列达到其上限为止。如果调用 pull() 的结果是 promise,则在该 promise 执行完毕之前,系统不会再次调用 pull()。如果 promise 被拒绝,流将出错。
  • cancel(reason):在串流使用方取消串流时调用。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持以下方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 构造函数的第二个实参(同样是可选的)是 queuingStrategy。它是一个对象,可选择为数据流定义一个队列策略,该策略接受两个参数:

  • highWaterMark:一个非负数,表示使用此队列策略的串流的高水位。
  • size(chunk):用于计算并返回给定分块值的有限非负大小的函数。结果用于确定回压,并通过适当的 ReadableStreamDefaultController.desiredSize 属性进行显示。它还会控制何时调用底层源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如需从可读取的数据流读取数据,您需要一个读取器,该读取器将是 ReadableStreamDefaultReaderReadableStream 接口的 getReader() 方法会创建一个读取器并将流锁定到该读取器。在流处于锁定状态时,除非此读取器被释放,否则无法获取其他读取器。

ReadableStreamDefaultReader 接口的 read() 方法会返回一个 Promise,用于访问数据流内部队列中的下一个分块。它会根据数据流的状态执行或拒绝操作,并返回结果。不同的可能性如下:

  • 如果有可用分块,系统会使用形式为
    { value: chunk, done: false } 的对象来执行该 promise。
  • 如果流关闭,promise 将使用形式为
    { value: undefined, done: true } 的对象执行。
  • 如果流出错,promise 将被拒绝并返回相关错误。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

您可以通过访问可读取数据流的 ReadableStream.locked 属性来检查该数据流是否已锁定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读取的流代码示例

以下代码示例展示了所有步骤的实际操作。您首先创建一个 ReadableStream,并在其 underlyingSource 参数(即 TimestampSource 类)中定义一个 start() 方法。此方法会指示数据流的 controller 在 10 秒内每秒 enqueue() 一次时间戳。最后,它会告知控制器 close() 数据流。您可以通过以下方式使用此流:通过 getReader() 方法创建一个读取器,然后调用 read(),直到流变为 done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

异步迭代

在每次 read() 循环迭代时检查数据流是否为 done 可能不是最方便的 API。幸运的是,我们很快就会有更好的方法来实现这一点:异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前,使用异步迭代的一种解决方法是使用 polyfill 实现该行为。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

对可读数据流进行分流

ReadableStream 接口的 tee() 方法会将当前可读取的流分流,返回一个包含两个生成分支作为新 ReadableStream 实例的两个元素数组。这样,两个读取器就可以同时读取一个数据流。例如,如果您想从服务器提取响应并将其流式传输到浏览器,同时也将其流式传输到服务工件缓存,则可以在服务工件中执行此操作。由于响应正文不能重复使用,因此您需要两个副本才能执行此操作。然后,如需取消该串流,您需要取消两个生成的分支。对数据流进行 Tee 操作通常会在整个过程中锁定它,以防止其他读取器锁定它。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可读取的字节流

对于表示字节的数据流,系统提供了可读取数据流的扩展版本,以便高效地处理字节,尤其是通过尽量减少复制来实现。字节流允许获取自带缓冲区 (BYOB) 读取器。默认实现可以提供一系列不同的输出,例如在 WebSocket 的情况下,可以提供字符串或数组缓冲区,而字节流可保证字节输出。此外,BYOB 读取器还具有稳定性优势。这是因为,如果缓冲区分离,则可以保证不会向同一缓冲区写入两次,从而避免竞争条件。BYOB 读取器可以减少浏览器需要运行垃圾回收的次数,因为它可以重复使用缓冲区。

创建可读取的字节流

您可以通过将额外的 type 参数传递给 ReadableStream() 构造函数来创建可读取的字节流。

new ReadableStream({ type: 'bytes' });

underlyingSource

可读字节流的底层源会被赋予一个 ReadableByteStreamController 以进行操作。其 ReadableByteStreamController.enqueue() 方法采用一个值为 ArrayBufferViewchunk 参数。属性 ReadableByteStreamController.byobRequest 会返回当前的 BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize 属性会返回所需大小,以填充受控流的内部队列。

queuingStrategy

ReadableStream() 构造函数的第二个参数(同样是可选参数)是 queuingStrategy。它是一个对象,可选择为数据流定义一个队列策略,该策略接受一个参数:

  • highWaterMark:一个非负字节数,表示使用此队列策略的串流的高水位。这用于确定回压,并通过相应的 ReadableByteStreamController.desiredSize 属性进行体现。它还会控制何时调用底层源的 pull() 方法。

getReader()read() 方法

然后,您可以通过相应地设置 mode 参数来访问 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" })。这样可以更精确地控制缓冲区分配,以避免复制。如需从字节流读取数据,您需要调用 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可读取的字节流代码示例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

以下函数会返回可读取的字节流,以便高效地零拷贝读取随机生成的数组。它会尝试填充开发者提供的缓冲区,而不是使用预定的 1,024 分块大小,从而实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写数据流的机制

可写入流是您可以将数据写入到的目的地,在 JavaScript 中由 WritableStream 对象表示。它是底层接收器(用于写入原始数据的较低级别 I/O 接收器)之上的抽象。

数据会通过写入器一次写入一个数据块到数据流中。分块可以采用多种形式,就像读取器中的分块一样。您可以使用任何代码生成准备写入的数据块;写入器及其关联的代码称为生产方

创建写入器并开始向流写入数据(活跃写入器)时,写入器会被视为锁定到该流。一次只能有一个写入器写入可写入流。如果您希望另一个写入器开始写入数据流,通常需要先释放它,然后再将另一个写入器附加到它。

内部队列会跟踪已写入到数据流但尚未被底层接收器处理的分块。

队列策略是一种对象,用于确定串流应如何根据其内部队列的状态发出回压信号。队列策略会为每个分块分配大小,并将队列中所有分块的总大小与一个指定的数字(称为“高水位”)进行比较。

最终的结构称为控制器。每个可写入流都有一个关联的控制器,可让您控制流(例如,中止流)。

创建可写入的流

Streams API 的 WritableStream 接口提供了一种标准抽象,用于将流式数据写入目的地(称为接收器)。此对象内置了回压和队列功能。您可以通过调用其构造函数 WritableStream() 来创建可写入的流。它有一个可选的 underlyingSink 参数,该参数表示一个具有方法和属性的对象,这些方法和属性定义了构建的串流实例的行为方式。

underlyingSink

underlyingSink 可以包含以下可选的开发者定义的方法。传递给某些方法的 controller 参数是 WritableStreamDefaultController

  • start(controller):在构建对象时,系统会立即调用此方法。此方法的内容应旨在获取对底层接收器的访问权限。如果此过程要异步完成,则可以返回一个 promise 来指示成功或失败。
  • write(chunk, controller):当有新的数据块(在 chunk 参数中指定)准备好写入底层接收器时,系统会调用此方法。它可以返回一个 promise,以指示写入操作是否成功。只有在之前的写入成功后,系统才会调用此方法,绝不会在流关闭或被中止后调用。
  • close(controller):如果应用发出信号表示已将分块写入到数据流,系统会调用此方法。内容应执行所有必要的操作,以最终完成对底层接收器的写入,并释放对它的访问权限。如果此过程是异步的,则可以返回一个 Promise 来指示成功或失败。只有在所有队列写入都成功后,才会调用此方法。
  • abort(reason):如果应用发出信号表示希望突然关闭数据流并将其置于错误状态,系统会调用此方法。它可以清理所有已占用的资源,与 close() 非常相似,但即使写入已加入队列,系统也会调用 abort()。这些分块将被舍弃。如果此过程是异步的,则可以返回一个 promise 来指示成功或失败。reason 参数包含一个 DOMString,用于说明流式传输被中止的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 接口表示一个控制器,可在设置期间、提交更多分块以进行写入时或写入结束时控制 WritableStream 的状态。构建 WritableStream 时,底层接收器会获得一个相应的 WritableStreamDefaultController 实例进行操作。WritableStreamDefaultController 只有一个方法:WritableStreamDefaultController.error(),这会导致日后与关联的串流的任何互动都出错。WritableStreamDefaultController 还支持 signal 属性,该属性会返回 AbortSignal 的实例,以便在需要时停止 WritableStream 操作。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 构造函数的第二个参数(同样是可选参数)是 queuingStrategy。它是一个对象,可选择为数据流定义一个队列策略,该策略接受两个参数:

  • highWaterMark:一个非负数,表示使用此队列策略的串流的高水位。
  • size(chunk):用于计算并返回给定分块值的有限非负大小的函数。结果用于确定回压,并通过相应的 WritableStreamDefaultWriter.desiredSize 属性体现出来。

getWriter()write() 方法

如需写入可写入的流,您需要使用写入器,即 WritableStreamDefaultWriterWritableStream 接口的 getWriter() 方法会返回一个新的 WritableStreamDefaultWriter 实例,并将数据流锁定到该实例。在流处于锁定状态时,除非当前的写入器被释放,否则无法获取任何其他写入器。

WritableStreamDefaultWriter 接口的 write() 方法会将传递的数据块写入 WritableStream 及其底层接收器,然后返回一个 promise,该 promise 会解析为指示写入操作是成功还是失败。请注意,“成功”的含义取决于底层接收器;它可能表示分块已被接受,但不一定表示分块已安全保存到最终目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

您可以通过访问可写入流的 WritableStream.locked 属性来检查该流是否已锁定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写入流代码示例

以下代码示例展示了所有步骤的操作过程。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

将可读取数据流管道化到可写数据流

可读数据流可以通过可读数据流的 pipeTo() 方法管道传输到可写数据流。ReadableStream.pipeTo() 会将当前 ReadableStream 管道传输到给定的 WritableStream,并返回一个 promise,该 promise 会在管道传输过程成功完成时执行,如果遇到任何错误,则会被拒绝。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建转换流

Streams API 的 TransformStream 接口表示一组可转换的数据。您可以通过调用其构造函数 TransformStream() 来创建转换流,该函数会根据给定的处理程序创建并返回转换流对象。TransformStream() 构造函数接受一个可选的 JavaScript 对象作为第一个参数,该对象表示 transformer。此类对象可以包含以下任一方法:

transformer

  • start(controller):在构建对象时,系统会立即调用此方法。通常,此方法用于使用 controller.enqueue() 将前缀分块加入队列。这些分块将从可读取端读取,但不依赖于对可写入端的任何写入。如果此初始过程是异步的(例如,因为获取前缀分块需要一些努力),该函数可以返回一个 Promise 来表示成功或失败;被拒绝的 Promise 会导致流出错。TransformStream() 构造函数将重新抛出所有抛出的异常。
  • transform(chunk, controller):当最初写入可写入端的新分块准备好进行转换时,系统会调用此方法。流实现可保证仅在之前的转换成功后调用此函数,并且绝不会在 start() 完成之前或调用 flush() 之后调用此函数。此函数会执行转换流的实际转换工作。它可以使用 controller.enqueue() 将结果加入队列。这样一来,写入可写入端的单个分块可能会在可读取端产生零个或多个分块,具体取决于 controller.enqueue() 的调用次数。如果转换过程是异步的,此函数可以返回一个 promise,以指示转换是否成功。被拒绝的 promise 会导致转换流的可读和可写侧都出错。如果未提供 transform() 方法,则系统会使用身份转换,该转换会将写入端与可读端中不变的分块加入队列。
  • flush(controller):在成功通过 transform() 转换写入可写入端的所有分块后,系统会调用此方法,并且可写入端即将关闭。通常,此操作用于在可读取端关闭之前,将后缀分块加入队列。如果刷新过程是异步的,该函数可以返回一个 promise 来表示成功或失败;结果将传达给 stream.writable.write() 的调用方。此外,被拒绝的 promise 会导致流的可读和可写侧都出错。抛出异常与返回被拒绝的 Promise 的处理方式相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 队列策略

TransformStream() 构造函数的第二个和第三个可选参数是可选的 writableStrategyreadableStrategy 队列策略。它们的定义分别在可读可写流部分中加以说明。

转换流代码示例

以下代码示例展示了简单的转换流程。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

将可读取的流通过转换流管道传输

ReadableStream 接口的 pipeThrough() 方法提供了一种可链接的方式,可将当前数据流通过转换流或任何其他可写入/可读取对进行管道传输。对流进行管道传输通常会在管道传输期间锁定该流,以防止其他读取器锁定该流。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

以下代码示例(有点牵强附会)展示了如何实现 fetch() 的“大声说话”版本,该版本会通过作为流使用返回的响应 promise 并逐分转换为大写形式来将所有文本转换为大写形式。这种方法的优势在于,您无需等待下载整个文档,这在处理大型文件时会产生巨大影响。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

演示

以下演示展示了可读、可写和转换流的实际运作方式。其中还包含 pipeThrough()pipeTo() 管道链的示例,以及 tee() 的演示。您可以选择在自己的窗口中运行演示,也可以查看源代码

浏览器中提供的实用数据流

浏览器中内置了许多实用的数据流。您可以轻松地从 Blob 创建 ReadableStreamBlob 接口的 stream() 方法会返回一个 ReadableStream,该 ReadableStream 在读取时会返回 blob 中包含的数据。另请注意,File 对象是一种特定的 Blob,可在 Blob 可用的任何上下文中使用。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的流式变体分别称为 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

您可以分别使用 CompressionStreamDecompressionStream 转换流轻松压缩或解压缩文件。以下代码示例展示了如何下载 Streams 规范、直接在浏览器中对其进行压缩(gzip),以及将压缩文件直接写入磁盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和实验性 fetch() 请求流是实际可写流的示例。

Serial API 大量使用可读取和可写入的流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,WebSocketStream API 将数据流与 WebSocket API 集成。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

实用资源

致谢

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 审核。 Jake Archibald 的博文对我了解数据流非常有帮助。部分代码示例受到 GitHub 用户 @bellbind 探索的启发,部分文字内容则大量借鉴了 MDN Web 文档中的“流”部分数据流标准作者在编写此规范方面做了大量工作。主打图片来自 Unsplash 上的 Ryan Lara