数据流 - 权威指南

了解如何将可读、可写和转换流与 Streams API 配合使用。

Streams API 可让您以编程方式访问通过网络接收的数据流 或以任何方式在本地创建 并使用 JavaScript 进行处理流式传输涉及分解您希望接收、发送或转换的资源 分成几小块,然后逐块处理这些块。流式传输是一种 浏览器在收到要在网页上展示的 HTML 或视频等资源时,都会做这种处理, 在 2015 年推出带流的 fetch 之前,JavaScript 从未提供过此功能。

以前,如果您要处理某种类型的资源(无论是视频还是文本文件等), 则必须下载整个文件,等待将其反序列化为合适的格式, 然后进行处理这些视频流可供 所有功能都会更改。现在,您可以使用 JavaScript 逐步处理原始数据,如 无需生成缓冲区、字符串或 blob 即可。 这解锁了许多用例,下面列出了其中一些用例:

  • 视频效果:通过应用了效果的转换流来传输可读的视频流 实时更新
  • 数据(解压缩):通过选择性 (解压缩)。
  • 图像解码:通过可对字节进行解码的转换流来传输 HTTP 响应流 转换为位图数据,然后通过另一个将位图转换为 PNG 的转换流。如果 安装在 Service Worker 的 fetch 处理程序中,这让您可以透明地执行 polyfill 操作, 例如 AVIF 等新图片格式

浏览器支持

ReadableStream 和 WritableStream

浏览器支持

  • Chrome:43。 <ph type="x-smartling-placeholder">
  • Edge:14。 <ph type="x-smartling-placeholder">
  • Firefox:65。 <ph type="x-smartling-placeholder">
  • Safari:10.1. <ph type="x-smartling-placeholder">

来源

TransformStream

浏览器支持

  • Chrome:67。 <ph type="x-smartling-placeholder">
  • Edge:79。 <ph type="x-smartling-placeholder">
  • Firefox:102。 <ph type="x-smartling-placeholder">
  • Safari:14.1. <ph type="x-smartling-placeholder">

来源

核心概念

在深入了解各种类型的直播之前,我先介绍一些核心概念。

文本块

数据块是写入数据流或从数据流中读取的一段数据。它可以是任意一个 类型;数据流甚至可以包含不同类型的数据块。大多数情况下,数据块可能不是原子性最强的 数据单元。例如,字节流可能包含由 16 个 KiB Uint8Array 单位,而非单个字节。

可读数据流

可读流表示可供读取的数据源。换句话说,数据来自 输出。具体而言,可读流是 ReadableStream 的实例, 类。

可写流

可写流表示可写入数据的目的地。换言之,数据 进入可写流。具体而言,可写流是 WritableStream 类。

转换流

转换流由一对流组成:可写流(称为可写端)、 和可读流(称为可读端)。 一个真实的比喻是 同声传译 实时将一种语言翻译成另一种语言。 以特定于转换流的方式,将 写入可写端会导致新数据可供从 可读一面。具体而言,任何具有 writable 属性和 readable 属性的对象都可以 转换为转换流。不过,标准 TransformStream 类可让您更轻松地创建

管链

流主要通过通过管道互相传递来使用。可读流可以直接通过管道传输 发送到可写流,使用可读流的 pipeTo() 方法,也可以通过 先使用可读流的 pipeThrough() 方法转换流。一组 以这种方式连接在一起的视频流称为管道链。

背压

管道链一旦构建完毕,就会传播有关分块流动速度的信号 如果链中的任何步骤还不能接受分块,则会向后传播信号 直到原始来源被告知停止生成分块, 速度快。这种流程标准化过程称为背压。

T 恤

可以使用其 tee() 方法对可读数据流进行标记(以大写“T”的形状命名)。 此操作会锁定音频流,也就是说,使其不能再直接使用;但系统会创建两个新的 流(称为分支),可单独使用。 T 恤也很重要,因为直播无法后退或重新开始,我们稍后会对此进行详细介绍。

<ph type="x-smartling-placeholder">
</ph> 管道链的示意图,其中包含来自对提取 API 的调用的可读取流,随后,转换流通过转换流进行传输,该转换流的输出已进行连接,接着发送到浏览器获取第一个生成的可读流,并发送到 Service Worker 缓存获取第二个生成的可读流。
竖线链。

可读数据流的机制

可读流是以 JavaScript 表示的数据源,由 ReadableStream 对象, 传输流量。通过 ReadableStream() 构造函数从给定处理程序创建并返回可读的流对象。有两个 底层来源的类型:

  • 推送来源会在您访问它们时不断向您推送数据,一切由您决定 启动、暂停或取消访问数据流。示例包括直播视频流、服务器发送的事件、 或 WebSockets。
  • 拉取来源要求您在连接到这些来源后明确请求数据。示例 通过 fetch()XMLHttpRequest 调用添加 HTTP 操作。

系统会按顺序读取流式数据(称为“数据块”的小段)。 位于数据流中的区块称为“已加入队列”。这意味着它们正在队列中等待 可供读取内部队列会跟踪尚未读取的区块。

排队策略是一个对象,用于确定数据流应如何根据 其内部队列的状态。排队策略会为每个分块分配一个大小,并将 将队列中所有分块的总大小设置为指定数字,称为“高水位”

数据流中的区块由“读取器”读取。该读取器以每个分块为单位来检索数据, 这样你就可以对它执行自己想要的任何操作读者和其他 与之一起处理的代码称为“使用方”

此上下文中的下一个结构称为控制器。每个可读流都有一个关联的 顾名思义,该控制器可用于控制流。

一次只能有一个读者读取数据流;当读取器创建并开始读取数据流时触发 (即成为活跃读取器),则会被锁定到该读取器。如果您希望其他读者接管 读取您的信息流时,通常需要先释放第一个读取器,然后再执行任何其他操作 (不过,您可以开通视频流)。

创建可读数据流

您可以通过调用其构造函数来创建可读流 ReadableStream()。 构造函数有一个可选的参数 underlyingSource,它代表一个对象 其中包含定义所构建的流实例将如何行为的方法和属性。

underlyingSource

这可以使用开发者定义的以下可选方法:

  • start(controller):构建对象时立即调用。通过 方法可以访问音频流来源 设置流式传输功能所需的资源如果该过程要异步完成,该方法可以 返回一个表示成功或失败的 promise。传递给此方法的 controller 参数为 一 ReadableStreamDefaultController
  • pull(controller):可用于在提取更多数据块时控制音频流。它 会反复调用,直到流的内部区块队列还未满 达到最高水位。如果调用 pull() 的结果是一个 promise, 在上述 promise 执行之前,系统不会再次调用 pull()。 如果 promise 拒绝,则流将会出错。
  • cancel(reason):在数据流使用方取消数据流时调用。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持以下方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 构造函数的第二个参数(同样可选)是 queuingStrategy。 它是一个对象,可视需要定义数据流的排队策略, 参数:

  • highWaterMark:一个非负数,表示使用此队列策略的数据流的高水位。
  • size(chunk):一个函数,用于计算并返回给定分块值的有限非负大小。 结果用于确定背压,并通过相应的 ReadableStreamDefaultController.desiredSize 属性体现。 它还会控制何时调用底层来源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如需从可读流读取数据,您需要一个读取器, ReadableStreamDefaultReaderReadableStream 接口的 getReader() 方法会创建一个读取器并将数据流锁定到 。当数据流被锁定时,在此数据流被释放之前,无法获取任何其他读取器。

read() ReadableStreamDefaultReader 接口的方法会返回一个 promise,并提供对下一个 流的内部队列中。它会执行或拒绝并返回结果 直播各种可能性如下所示:

  • 如果有可用的数据块,该 promise 会在执行时返回一个
    形式的对象 { value: chunk, done: false }
  • 如果数据流关闭,该 promise 将执行并返回
    形式的对象 { value: undefined, done: true }
  • 如果数据流出错,promise 将被拒绝并出现相关错误。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

您可以通过访问其 ReadableStream.locked 属性。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读数据流代码示例

以下代码示例展示了所有实际步骤。首先,您需要创建一个 ReadableStream,并将其 underlyingSource 参数(即 TimestampSource 类)定义了 start() 方法。 此方法会告知音频流的 controllerenqueue() 在十秒内每秒发送一个时间戳。 最后,它会告知控制器对流执行 close() 操作。你使用了此 通过 getReader() 方法创建读取器并调用 read(),直到数据流完成 done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

异步迭代

在每次 read() 循环迭代时检查流是否为 done 可能不是最方便的 API。 幸运的是,我们很快就会推出一种更好的方法来实现此目的:异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前,使用异步迭代的一个解决方法是使用 polyfill 实现该行为。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

设置可读流

此 API 的 tee() 方法 ReadableStream 接口与当前可读流匹配,返回双元素数组 包含生成的两个分支作为新的 ReadableStream 实例。这样, 两个读取器同时读取一个流。例如,在以下情况下,您可以在 Service Worker 中执行此操作: 您希望从服务器提取响应并将其流式传输到浏览器,同时又将其流式传输到 Service Worker 缓存。由于响应正文不能多次使用,因此您需要两个副本 来实现这一点如需取消数据流,您需要取消两个生成的分支。玩转直播 通常情况下,系统会在该时间段内将其锁定,以防止其他读取器将其锁定。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可读字节流

对于表示字节的流,提供了可读流的扩展版本来处理 特别是要尽量减少副本数量。字节流支持自带缓冲区 要获取的 (BYOB) 读取器。默认实现可以提供一系列不同的输出, 在 WebSocket 的情况下设置为字符串或数组缓冲区,而字节流可保证字节输出。 此外,BYOB 读取器具有稳定性方面的优势。这是 因为如果一个缓冲区分离,就可以保证一个缓冲区不会两次写入同一缓冲区, 从而避免竞态条件。BYOB 读取器可以减少浏览器需要运行的次数 因为它可以重复使用缓冲区

创建可读字节流

您可以通过将额外的 type 参数传递给 ReadableStream() 构造函数。

new ReadableStream({ type: 'bytes' });

underlyingSource

为可读字节流的底层源指定 ReadableByteStreamController, 操作。其 ReadableByteStreamController.enqueue() 方法接受 chunk 实参,其值 是 ArrayBufferViewReadableByteStreamController.byobRequest 属性会返回当前的 BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize 属性返回所需的大小,以填充受控流的内部队列。

queuingStrategy

ReadableStream() 构造函数的第二个参数(同样可选)是 queuingStrategy。 它是一个对象,可视需要定义数据流的排队策略, 参数:

  • highWaterMark:非负数字节数,表示使用此队列策略的流的高水位标记。 此属性用于确定背压,通过相应的 ReadableByteStreamController.desiredSize 属性加以体现。 它还会控制何时调用底层来源的 pull() 方法。

getReader()read() 方法

然后,您可以通过相应设置 mode 形参来获得对 ReadableStreamBYOBReader 的访问权限: ReadableStream.getReader({ mode: "byob" })。这样可以更精确地控制缓冲区 以避免复制。要从字节流读取数据,您需要调用 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可读字节流代码示例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

以下函数返回可读字节流,这些字节流支持对 随机生成的数组。它并没有使用预定的分块大小 1,024,而是尝试填充 开发者提供的缓冲区,可实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写数据流的机制

可写流是您可以向其中写入数据的目的地,在 JavaScript 中由 WritableStream 对象。这个 充当底层接收器(底层 I/O 接收器)之上的抽象概念, 原始数据。

数据通过写入程序写入流,一次一个数据块。一个块可以占用 就像读者中的部分一样。您可以使用自己喜欢的任何代码 可供写入的数据块写入者以及关联的代码称为“提供方”。

当写入者创建并开始写入数据流(活跃写入者)时,可以说是 锁定。一次只能有一个写入者向可写流写入数据。如果您想再 写入流,通常需要将其释放,然后才能附加 其他作者

内部队列跟踪已写入流但尚未写入的区块 由底层接收器处理的数据

排队策略是一个对象,用于确定数据流应如何根据 其内部队列的状态。排队策略会为每个分块分配一个大小,并将 将队列中所有分块的总大小设置为指定数字,称为“高水位”

最终构造称为控制器。每个可写流都有一个关联的控制器, 可让您控制流(例如,取消流)。

创建可写流

WritableStream 接口的 Streams API 提供了一种标准抽象,用于将流式数据写入目的地(称为 作为接收器。该对象内置了背压和队列功能。你可以通过以下方式创建可写流: 调用其构造函数 WritableStream()。 它有一个可选的 underlyingSink 参数,代表一个对象 其中包含定义所构建的流实例将如何行为的方法和属性。

underlyingSink

underlyingSink 可以包含以下由开发者定义的可选方法。controller 传递给一些方法的参数是一个 WritableStreamDefaultController

  • start(controller):构建对象时,系统会立即调用此方法。通过 此方法的内容应旨在获取对底层接收器的访问权限。如果该过程是 则可以返回一个 promise 来指示成功或失败。
  • write(chunk, controller):如果新数据块(在 chunk 参数)准备就绪,可以写入底层接收器。它可以向 指示写入操作成功或失败。只有在调用 写入成功,且绝不会在数据流关闭或中止后执行。
  • close(controller):如果应用指示它已完成写入,则将调用此方法 将数据块添加到音频流内容应执行任何必要操作,以便完成 以及释放对底层接收器的访问权限如果此过程是异步的,则会返回 来表明成功或失败。只有在所有已排入队列的写入操作后,系统才会调用此方法 成功了
  • abort(reason):如果应用发出要突然关闭的信号,系统会调用此方法 并将其置于错误状态。它可以清理任何占用的资源 close(),但即使写入排队等待,系统也会调用 abort()。系统会将这些区块 。如果该过程是异步的,它可以返回一个 promise 来指示成功或失败。通过 reason 参数包含描述数据流中止原因的 DOMString
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

通过 WritableStreamDefaultController Streams API 的接口代表了用于控制 WritableStream 状态的控制器 在设置期间(因为需要提交更多数据块以供写入)或写入结束时。构建 WritableStream,系统会为底层接收器提供相应的 WritableStreamDefaultController 实例。WritableStreamDefaultController 只有一种方法: WritableStreamDefaultController.error(), 这会导致今后与相关视频流的所有互动都出错。 WritableStreamDefaultController 还支持 signal 属性,该属性会返回 AbortSignal、 允许根据需要停止 WritableStream 操作。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 构造函数的第二个参数(同样可选)是 queuingStrategy。 它是一个对象,可视需要定义数据流的排队策略, 参数:

  • highWaterMark:一个非负数,表示使用此队列策略的数据流的高水位。
  • size(chunk):一个函数,用于计算并返回给定分块值的有限非负大小。 结果用于确定背压,并通过相应的 WritableStreamDefaultWriter.desiredSize 属性体现。

getWriter()write() 方法

要向可写流写入数据,您需要一个写入者, WritableStreamDefaultWriterWritableStream 接口的 getWriter() 方法会返回一个 WritableStreamDefaultWriter 的新实例,并将流锁定到该实例。虽然 流已锁定,除非释放当前写入程序,否则无法获取其他写入者。

write() 方法的 WritableStreamDefaultWriter 接口将传递的数据块写入 WritableStream 及其底层接收器,然后返回 一个 promise,可解析以指示写入操作成功或失败。请注意 “成功”均由底层接收器决定则表示相应分块已被接受 不一定会安全地保存到最终目的地

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

您可以访问可写流的 WritableStream.locked 属性。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写数据流代码示例

以下代码示例展示了所有实际步骤。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

将可读流传输到可写流

通过可读流的 pipeTo() 方法。 ReadableStream.pipeTo() 会将当前的 ReadableStream 通过管道传送到给定的 WritableStream,并返回 在管道流程成功完成时执行,或者在出现任何错误时拒绝 错误。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建转换流

Streams API 的 TransformStream 接口表示一组可转换的数据。您 通过调用构造函数 TransformStream() 来创建转换流,该函数会创建并返回 转换流对象。TransformStream() 构造函数会接受 其第一个参数是表示 transformer 的可选 JavaScript 对象。此类对象 包含以下任意方法:

transformer

  • start(controller):构建对象时,系统会立即调用此方法。一般价格 这用于使用 controller.enqueue() 将前缀区块加入队列。这些分块将被读取 但不依赖于对可写端的任何写入。如果这个初始 是异步的,例如,因为获取前缀分块需要花费一些精力, 该函数可以返回一个 promise 来指示成功或失败;被拒绝的 promise 将出错, 。任何抛出的异常都将由 TransformStream() 构造函数重新抛出。
  • transform(chunk, controller):如果新数据块最初写入到 可写端准备好进行转换。流实现保证了此函数 将仅在之前的转换成功后调用,且在 start() 之前绝不会调用 或调用 flush() 之后。此函数将执行实际的 转换流的工作。它可以使用 controller.enqueue() 将结果加入队列。这个 允许在可写端写入单个区块,从而在 可读端,具体取决于调用 controller.enqueue() 的次数。如果 是异步的,此函数可以返回一个 promise 来指示 转换。被拒绝的 promise 将使 转换流。如果未提供 transform() 方法,则使用标识转换,该转换将 将未更改的区块从可写端加入到可读端。
  • flush(controller):在写入可写端的所有分块完成后调用此方法 成功通过 transform() 进行了转换,可写端即将变为 已关闭。通常,这用于将后缀区块加入可读端的队列,然后再进行排队。 。如果刷新过程是异步进行的,该函数可以向 信号成功或失败;系统会将结果传达给 stream.writable.write()。此外,被拒绝的 promise 会导致可读和 流的可写端。抛出异常与返回遭拒 promise。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 队列策略

TransformStream() 构造函数的第二个和第三个可选参数是可选的 writableStrategyreadableStrategy 队列策略。它们的定义如 readwritable 流 部分。

转换数据流代码示例

以下代码示例展示了一个简单的转换流的实际操作。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

通过转换流来管道读取流

pipeThrough() ReadableStream 接口的方法提供了一种可对当前流进行管道处理的方式 转换流或任何其他可写/可读对。直播通常会锁定 从而防止其他读取器将其锁定。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一个代码示例(有点人为)展示了如何实现“大声喊出”功能fetch() 的版本 通过使用返回的响应 promise 将所有文本大写 以信息流形式 以及逐个块的大写形式这种方法的优势在于,您无需等待 下载整个文档,这在处理大型文件时会产生很大的影响。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

演示

以下演示显示了可读、可写和转换流的实际应用。其中还包含一些示例 (包含 pipeThrough()pipeTo() 管道链),并演示了 tee()。您可以视需要运行 单独窗口中的演示或查看 源代码

浏览器中提供的实用信息流

浏览器中内置了许多有用的信息流。你可以轻松创建 ReadableStreamBlob 接口的 stream() 方法会返回 ReadableStream,读取时会返回 blob 中包含的数据。还记得 File 对象是一种特定类型的 Blob,并且可在 blob 可以用的任何上下文中使用。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的流式传输变体称为 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

使用 CompressionStreamDecompressionStream 转换流 。以下代码示例展示了如何下载视频流规范,并将其压缩 (gzip) 并将压缩文件直接写入磁盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和实验性 fetch() 请求流 公开环境中的可写流示例。

Serial API 大量使用可读和可写流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,WebSocketStream API 将流与 WebSocket API 集成在一起。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

实用资源

致谢

本文由以下人员审核: Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe Medley亚当·赖斯Jake Archibald 的博文帮助我了解了很多 。部分代码示例的灵感来自 GitHub 用户 @bellbind 的探索和 散文的某些部分很大程度上基于 关于 Streams 的 MDN 网页文档。通过 Streams Standard 作者在研究领域 编写本规范Ryan Lara 发布的主打图片 取消启动