了解如何将可读、可写和转换流与 Streams API 配合使用。
Streams API 可让您以编程方式访问通过网络接收的数据流
或以任何方式在本地创建
并使用 JavaScript 进行处理流式传输涉及分解您希望接收、发送或转换的资源
分成几小块,然后逐块处理这些块。流式传输是一种
浏览器在收到要在网页上展示的 HTML 或视频等资源时,都会做这种处理,
在 2015 年推出带流的 fetch
之前,JavaScript 从未提供过此功能。
以前,如果您要处理某种类型的资源(无论是视频还是文本文件等), 则必须下载整个文件,等待将其反序列化为合适的格式, 然后进行处理这些视频流可供 所有功能都会更改。现在,您可以使用 JavaScript 逐步处理原始数据,如 无需生成缓冲区、字符串或 blob 即可。 这解锁了许多用例,下面列出了其中一些用例:
- 视频效果:通过应用了效果的转换流来传输可读的视频流 实时更新
- 数据(解压缩):通过选择性 (解压缩)。
- 图像解码:通过可对字节进行解码的转换流来传输 HTTP 响应流
转换为位图数据,然后通过另一个将位图转换为 PNG 的转换流。如果
安装在 Service Worker 的
fetch
处理程序中,这让您可以透明地执行 polyfill 操作, 例如 AVIF 等新图片格式
浏览器支持
ReadableStream 和 WritableStream
浏览器支持
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
TransformStream
浏览器支持
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
核心概念
在深入了解各种类型的直播之前,我先介绍一些核心概念。
文本块
数据块是写入数据流或从数据流中读取的一段数据。它可以是任意一个
类型;数据流甚至可以包含不同类型的数据块。大多数情况下,数据块可能不是原子性最强的
数据单元。例如,字节流可能包含由 16 个
KiB Uint8Array
单位,而非单个字节。
可读数据流
可读流表示可供读取的数据源。换句话说,数据来自
输出。具体而言,可读流是 ReadableStream
的实例,
类。
可写流
可写流表示可写入数据的目的地。换言之,数据
进入可写流。具体而言,可写流是
WritableStream
类。
转换流
转换流由一对流组成:可写流(称为可写端)、
和可读流(称为可读端)。
一个真实的比喻是
同声传译
实时将一种语言翻译成另一种语言。
以特定于转换流的方式,将
写入可写端会导致新数据可供从
可读一面。具体而言,任何具有 writable
属性和 readable
属性的对象都可以
转换为转换流。不过,标准 TransformStream
类可让您更轻松地创建
管链
流主要通过通过管道互相传递来使用。可读流可以直接通过管道传输
发送到可写流,使用可读流的 pipeTo()
方法,也可以通过
先使用可读流的 pipeThrough()
方法转换流。一组
以这种方式连接在一起的视频流称为管道链。
背压
管道链一旦构建完毕,就会传播有关分块流动速度的信号 如果链中的任何步骤还不能接受分块,则会向后传播信号 直到原始来源被告知停止生成分块, 速度快。这种流程标准化过程称为背压。
T 恤
可以使用其 tee()
方法对可读数据流进行标记(以大写“T”的形状命名)。
此操作会锁定音频流,也就是说,使其不能再直接使用;但系统会创建两个新的
流(称为分支),可单独使用。
T 恤也很重要,因为直播无法后退或重新开始,我们稍后会对此进行详细介绍。
可读数据流的机制
可读流是以 JavaScript 表示的数据源,由
ReadableStream
对象,
传输流量。通过
ReadableStream()
构造函数从给定处理程序创建并返回可读的流对象。有两个
底层来源的类型:
- 推送来源会在您访问它们时不断向您推送数据,一切由您决定 启动、暂停或取消访问数据流。示例包括直播视频流、服务器发送的事件、 或 WebSockets。
- 拉取来源要求您在连接到这些来源后明确请求数据。示例
通过
fetch()
或XMLHttpRequest
调用添加 HTTP 操作。
系统会按顺序读取流式数据(称为“数据块”的小段)。 位于数据流中的区块称为“已加入队列”。这意味着它们正在队列中等待 可供读取内部队列会跟踪尚未读取的区块。
排队策略是一个对象,用于确定数据流应如何根据 其内部队列的状态。排队策略会为每个分块分配一个大小,并将 将队列中所有分块的总大小设置为指定数字,称为“高水位”。
数据流中的区块由“读取器”读取。该读取器以每个分块为单位来检索数据, 这样你就可以对它执行自己想要的任何操作读者和其他 与之一起处理的代码称为“使用方”。
此上下文中的下一个结构称为控制器。每个可读流都有一个关联的 顾名思义,该控制器可用于控制流。
一次只能有一个读者读取数据流;当读取器创建并开始读取数据流时触发 (即成为活跃读取器),则会被锁定到该读取器。如果您希望其他读者接管 读取您的信息流时,通常需要先释放第一个读取器,然后再执行任何其他操作 (不过,您可以开通视频流)。
创建可读数据流
您可以通过调用其构造函数来创建可读流
ReadableStream()
。
构造函数有一个可选的参数 underlyingSource
,它代表一个对象
其中包含定义所构建的流实例将如何行为的方法和属性。
underlyingSource
这可以使用开发者定义的以下可选方法:
start(controller)
:构建对象时立即调用。通过 方法可以访问音频流来源 设置流式传输功能所需的资源如果该过程要异步完成,该方法可以 返回一个表示成功或失败的 promise。传递给此方法的controller
参数为 一ReadableStreamDefaultController
。pull(controller)
:可用于在提取更多数据块时控制音频流。它 会反复调用,直到流的内部区块队列还未满 达到最高水位。如果调用pull()
的结果是一个 promise, 在上述 promise 执行之前,系统不会再次调用pull()
。 如果 promise 拒绝,则流将会出错。cancel(reason)
:在数据流使用方取消数据流时调用。
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
支持以下方法:
ReadableStreamDefaultController.close()
用于关闭关联的信息流。ReadableStreamDefaultController.enqueue()
将关联的数据流中的给定区块加入队列。ReadableStreamDefaultController.error()
会导致今后与相关视频流的所有互动都出错。
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
ReadableStream()
构造函数的第二个参数(同样可选)是 queuingStrategy
。
它是一个对象,可视需要定义数据流的排队策略,
参数:
highWaterMark
:一个非负数,表示使用此队列策略的数据流的高水位。size(chunk)
:一个函数,用于计算并返回给定分块值的有限非负大小。 结果用于确定背压,并通过相应的ReadableStreamDefaultController.desiredSize
属性体现。 它还会控制何时调用底层来源的pull()
方法。
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
和 read()
方法
如需从可读流读取数据,您需要一个读取器,
ReadableStreamDefaultReader
。
ReadableStream
接口的 getReader()
方法会创建一个读取器并将数据流锁定到
。当数据流被锁定时,在此数据流被释放之前,无法获取任何其他读取器。
read()
ReadableStreamDefaultReader
接口的方法会返回一个 promise,并提供对下一个
流的内部队列中。它会执行或拒绝并返回结果
直播各种可能性如下所示:
- 如果有可用的数据块,该 promise 会在执行时返回一个
形式的对象{ value: chunk, done: false }
。 - 如果数据流关闭,该 promise 将执行并返回
形式的对象{ value: undefined, done: true }
。 - 如果数据流出错,promise 将被拒绝并出现相关错误。
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
属性
您可以通过访问其
ReadableStream.locked
属性。
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
可读数据流代码示例
以下代码示例展示了所有实际步骤。首先,您需要创建一个 ReadableStream
,并将其
underlyingSource
参数(即 TimestampSource
类)定义了 start()
方法。
此方法会告知音频流的 controller
:
enqueue()
在十秒内每秒发送一个时间戳。
最后,它会告知控制器对流执行 close()
操作。你使用了此
通过 getReader()
方法创建读取器并调用 read()
,直到数据流完成
done
。
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
异步迭代
在每次 read()
循环迭代时检查流是否为 done
可能不是最方便的 API。
幸运的是,我们很快就会推出一种更好的方法来实现此目的:异步迭代。
for await (const chunk of stream) {
console.log(chunk);
}
目前,使用异步迭代的一个解决方法是使用 polyfill 实现该行为。
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
设置可读流
此 API 的 tee()
方法
ReadableStream
接口与当前可读流匹配,返回双元素数组
包含生成的两个分支作为新的 ReadableStream
实例。这样,
两个读取器同时读取一个流。例如,在以下情况下,您可以在 Service Worker 中执行此操作:
您希望从服务器提取响应并将其流式传输到浏览器,同时又将其流式传输到
Service Worker 缓存。由于响应正文不能多次使用,因此您需要两个副本
来实现这一点如需取消数据流,您需要取消两个生成的分支。玩转直播
通常情况下,系统会在该时间段内将其锁定,以防止其他读取器将其锁定。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
可读字节流
对于表示字节的流,提供了可读流的扩展版本来处理 特别是要尽量减少副本数量。字节流支持自带缓冲区 要获取的 (BYOB) 读取器。默认实现可以提供一系列不同的输出, 在 WebSocket 的情况下设置为字符串或数组缓冲区,而字节流可保证字节输出。 此外,BYOB 读取器具有稳定性方面的优势。这是 因为如果一个缓冲区分离,就可以保证一个缓冲区不会两次写入同一缓冲区, 从而避免竞态条件。BYOB 读取器可以减少浏览器需要运行的次数 因为它可以重复使用缓冲区
创建可读字节流
您可以通过将额外的 type
参数传递给
ReadableStream()
构造函数。
new ReadableStream({ type: 'bytes' });
underlyingSource
为可读字节流的底层源指定 ReadableByteStreamController
,
操作。其 ReadableByteStreamController.enqueue()
方法接受 chunk
实参,其值
是 ArrayBufferView
。ReadableByteStreamController.byobRequest
属性会返回当前的
BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize
属性返回所需的大小,以填充受控流的内部队列。
queuingStrategy
ReadableStream()
构造函数的第二个参数(同样可选)是 queuingStrategy
。
它是一个对象,可视需要定义数据流的排队策略,
参数:
highWaterMark
:非负数字节数,表示使用此队列策略的流的高水位标记。 此属性用于确定背压,通过相应的ReadableByteStreamController.desiredSize
属性加以体现。 它还会控制何时调用底层来源的pull()
方法。
getReader()
和 read()
方法
然后,您可以通过相应设置 mode
形参来获得对 ReadableStreamBYOBReader
的访问权限:
ReadableStream.getReader({ mode: "byob" })
。这样可以更精确地控制缓冲区
以避免复制。要从字节流读取数据,您需要调用
ReadableStreamBYOBReader.read(view)
,其中 view
是
ArrayBufferView
。
可读字节流代码示例
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
以下函数返回可读字节流,这些字节流支持对 随机生成的数组。它并没有使用预定的分块大小 1,024,而是尝试填充 开发者提供的缓冲区,可实现完全控制。
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
可写数据流的机制
可写流是您可以向其中写入数据的目的地,在 JavaScript 中由
WritableStream
对象。这个
充当底层接收器(底层 I/O 接收器)之上的抽象概念,
原始数据。
数据通过写入程序写入流,一次一个数据块。一个块可以占用 就像读者中的部分一样。您可以使用自己喜欢的任何代码 可供写入的数据块写入者以及关联的代码称为“提供方”。
当写入者创建并开始写入数据流(活跃写入者)时,可以说是 锁定。一次只能有一个写入者向可写流写入数据。如果您想再 写入流,通常需要将其释放,然后才能附加 其他作者
内部队列跟踪已写入流但尚未写入的区块 由底层接收器处理的数据
排队策略是一个对象,用于确定数据流应如何根据 其内部队列的状态。排队策略会为每个分块分配一个大小,并将 将队列中所有分块的总大小设置为指定数字,称为“高水位”。
最终构造称为控制器。每个可写流都有一个关联的控制器, 可让您控制流(例如,取消流)。
创建可写流
WritableStream
接口的
Streams API 提供了一种标准抽象,用于将流式数据写入目的地(称为
作为接收器。该对象内置了背压和队列功能。你可以通过以下方式创建可写流:
调用其构造函数
WritableStream()
。
它有一个可选的 underlyingSink
参数,代表一个对象
其中包含定义所构建的流实例将如何行为的方法和属性。
underlyingSink
underlyingSink
可以包含以下由开发者定义的可选方法。controller
传递给一些方法的参数是一个
WritableStreamDefaultController
。
start(controller)
:构建对象时,系统会立即调用此方法。通过 此方法的内容应旨在获取对底层接收器的访问权限。如果该过程是 则可以返回一个 promise 来指示成功或失败。write(chunk, controller)
:如果新数据块(在chunk
参数)准备就绪,可以写入底层接收器。它可以向 指示写入操作成功或失败。只有在调用 写入成功,且绝不会在数据流关闭或中止后执行。close(controller)
:如果应用指示它已完成写入,则将调用此方法 将数据块添加到音频流内容应执行任何必要操作,以便完成 以及释放对底层接收器的访问权限如果此过程是异步的,则会返回 来表明成功或失败。只有在所有已排入队列的写入操作后,系统才会调用此方法 成功了abort(reason)
:如果应用发出要突然关闭的信号,系统会调用此方法 并将其置于错误状态。它可以清理任何占用的资源close()
,但即使写入排队等待,系统也会调用abort()
。系统会将这些区块 。如果该过程是异步的,它可以返回一个 promise 来指示成功或失败。通过reason
参数包含描述数据流中止原因的DOMString
。
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
通过
WritableStreamDefaultController
Streams API 的接口代表了用于控制 WritableStream
状态的控制器
在设置期间(因为需要提交更多数据块以供写入)或写入结束时。构建
WritableStream
,系统会为底层接收器提供相应的 WritableStreamDefaultController
实例。WritableStreamDefaultController
只有一种方法:
WritableStreamDefaultController.error()
,
这会导致今后与相关视频流的所有互动都出错。
WritableStreamDefaultController
还支持 signal
属性,该属性会返回
AbortSignal
、
允许根据需要停止 WritableStream
操作。
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
WritableStream()
构造函数的第二个参数(同样可选)是 queuingStrategy
。
它是一个对象,可视需要定义数据流的排队策略,
参数:
highWaterMark
:一个非负数,表示使用此队列策略的数据流的高水位。size(chunk)
:一个函数,用于计算并返回给定分块值的有限非负大小。 结果用于确定背压,并通过相应的WritableStreamDefaultWriter.desiredSize
属性体现。
getWriter()
和 write()
方法
要向可写流写入数据,您需要一个写入者,
WritableStreamDefaultWriter
。WritableStream
接口的 getWriter()
方法会返回一个
WritableStreamDefaultWriter
的新实例,并将流锁定到该实例。虽然
流已锁定,除非释放当前写入程序,否则无法获取其他写入者。
write()
方法的
WritableStreamDefaultWriter
接口将传递的数据块写入 WritableStream
及其底层接收器,然后返回
一个 promise,可解析以指示写入操作成功或失败。请注意
“成功”均由底层接收器决定则表示相应分块已被接受
不一定会安全地保存到最终目的地
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
属性
您可以访问可写流的
WritableStream.locked
属性。
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
可写数据流代码示例
以下代码示例展示了所有实际步骤。
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
将可读流传输到可写流
通过可读流的
pipeTo()
方法。
ReadableStream.pipeTo()
会将当前的 ReadableStream
通过管道传送到给定的 WritableStream
,并返回
在管道流程成功完成时执行,或者在出现任何错误时拒绝
错误。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
创建转换流
Streams API 的 TransformStream
接口表示一组可转换的数据。您
通过调用构造函数 TransformStream()
来创建转换流,该函数会创建并返回
转换流对象。TransformStream()
构造函数会接受
其第一个参数是表示 transformer
的可选 JavaScript 对象。此类对象
包含以下任意方法:
transformer
start(controller)
:构建对象时,系统会立即调用此方法。一般价格 这用于使用controller.enqueue()
将前缀区块加入队列。这些分块将被读取 但不依赖于对可写端的任何写入。如果这个初始 是异步的,例如,因为获取前缀分块需要花费一些精力, 该函数可以返回一个 promise 来指示成功或失败;被拒绝的 promise 将出错, 。任何抛出的异常都将由TransformStream()
构造函数重新抛出。transform(chunk, controller)
:如果新数据块最初写入到 可写端准备好进行转换。流实现保证了此函数 将仅在之前的转换成功后调用,且在start()
之前绝不会调用 或调用flush()
之后。此函数将执行实际的 转换流的工作。它可以使用controller.enqueue()
将结果加入队列。这个 允许在可写端写入单个区块,从而在 可读端,具体取决于调用controller.enqueue()
的次数。如果 是异步的,此函数可以返回一个 promise 来指示 转换。被拒绝的 promise 将使 转换流。如果未提供transform()
方法,则使用标识转换,该转换将 将未更改的区块从可写端加入到可读端。flush(controller)
:在写入可写端的所有分块完成后调用此方法 成功通过transform()
进行了转换,可写端即将变为 已关闭。通常,这用于将后缀区块加入可读端的队列,然后再进行排队。 。如果刷新过程是异步进行的,该函数可以向 信号成功或失败;系统会将结果传达给stream.writable.write()
。此外,被拒绝的 promise 会导致可读和 流的可写端。抛出异常与返回遭拒 promise。
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
和 readableStrategy
队列策略
TransformStream()
构造函数的第二个和第三个可选参数是可选的
writableStrategy
和 readableStrategy
队列策略。它们的定义如
read 和 writable 流
部分。
转换数据流代码示例
以下代码示例展示了一个简单的转换流的实际操作。
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
通过转换流来管道读取流
pipeThrough()
ReadableStream
接口的方法提供了一种可对当前流进行管道处理的方式
转换流或任何其他可写/可读对。直播通常会锁定
从而防止其他读取器将其锁定。
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
下一个代码示例(有点人为)展示了如何实现“大声喊出”功能fetch()
的版本
通过使用返回的响应 promise 将所有文本大写
以信息流形式
以及逐个块的大写形式这种方法的优势在于,您无需等待
下载整个文档,这在处理大型文件时会产生很大的影响。
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
演示
以下演示显示了可读、可写和转换流的实际应用。其中还包含一些示例
(包含 pipeThrough()
和 pipeTo()
管道链),并演示了 tee()
。您可以视需要运行
单独窗口中的演示或查看
源代码。
浏览器中提供的实用信息流
浏览器中内置了许多有用的信息流。你可以轻松创建
ReadableStream
。Blob
接口的 stream() 方法会返回
ReadableStream
,读取时会返回 blob 中包含的数据。还记得
File
对象是一种特定类型的
Blob
,并且可在 blob 可以用的任何上下文中使用。
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
和 TextEncoder.encode()
的流式传输变体称为
TextDecoderStream
和
TextEncoderStream
。
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
使用
CompressionStream
和
DecompressionStream
转换流
。以下代码示例展示了如何下载视频流规范,并将其压缩 (gzip)
并将压缩文件直接写入磁盘。
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API 的
FileSystemWritableFileStream
和实验性 fetch()
请求流
公开环境中的可写流示例。
Serial API 大量使用可读和可写流。
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
最后,WebSocketStream
API 将流与 WebSocket API 集成在一起。
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
实用资源
致谢
本文由以下人员审核: Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley 和 亚当·赖斯。 Jake Archibald 的博文帮助我了解了很多 。部分代码示例的灵感来自 GitHub 用户 @bellbind 的探索和 散文的某些部分很大程度上基于 关于 Streams 的 MDN 网页文档。通过 Streams Standard 作者在研究领域 编写本规范Ryan Lara 发布的主打图片 取消启动。