Saiba como usar streams legíveis, graváveis e de transformação com a API Streams.
A API Streams permite acessar de maneira programática fluxos de dados recebidos pela rede
ou criados localmente e
processá-los com JavaScript. O streaming envolve dividir um recurso que você quer receber, enviar ou transformar
em pequenos pedaços e, em seguida, processar esses pedaços. Embora a transmissão em streaming seja algo
que os navegadores fazem de qualquer maneira ao receber recursos como HTML ou vídeos para serem mostrados em páginas da Web, esse
recurso nunca esteve disponível para JavaScript antes de fetch
com transmissões ser introduzido em 2015.
Antes, se você quisesse processar um recurso de algum tipo (seja um vídeo ou um arquivo de texto etc.), era necessário fazer o download do arquivo inteiro, esperar que ele fosse desserializado em um formato adequado e, em seguida, processá-lo. Com as transmissões disponíveis para JavaScript, tudo isso muda. Agora é possível processar dados brutos com JavaScript progressivamente, assim que estiverem disponíveis no cliente, sem a necessidade de gerar um buffer, uma string ou um blob. Isso libera vários casos de uso, como os listados abaixo:
- Efeitos de vídeo: transmissão de um stream de vídeo legível por um stream de transformação que aplica efeitos em tempo real.
- (des)compressão de dados: encadeamento de um stream de arquivos por meio de um stream de transformação que o(des)compacta seletivamente.
- Decodificação de imagem: encaminhar um fluxo de resposta HTTP por um fluxo de transformação que decodifica bytes
em dados de bitmap e, em seguida, por outro fluxo de transformação que converte bitmaps em PNGs. Se
instalado dentro do gerenciador
fetch
de um worker de serviço, isso permite que você faça um polifill transparente de novos formatos de imagem, como AVIF.
Suporte ao navegador
ReadableStream e WritableStream
TransformStream
Principais conceitos
Antes de entrar em detalhes sobre os vários tipos de transmissão, vamos apresentar alguns conceitos básicos.
Pedaços
Um bloco é um único dado gravado ou lido em um fluxo. Ele pode ser de qualquer tipo. Os streams podem até mesmo conter blocos de diferentes tipos. Na maioria das vezes, um bloco não é a unidade de dados
mais atômica de um determinado fluxo. Por exemplo, um stream de bytes pode conter blocos de 16
unidades Uint8Array
de KiB, em vez de bytes únicos.
Streams legíveis
Um fluxo legível representa uma fonte de dados que pode ser lida. Em outras palavras, os dados saem de um fluxo legível. Especificamente, um stream legível é uma instância da classe
ReadableStream
.
Streams graváveis
Um stream gravável representa um destino para dados em que você pode gravar. Em outras palavras, os dados
são inseridos em um stream gravável. Concretamente, um stream gravável é uma instância da
classe WritableStream
.
Transformar streams
Um stream de transformação consiste em um par de streams: um stream gravável, conhecido como lado gravável, e um stream legível, conhecido como lado legível.
Uma metáfora do mundo real para isso seria um
intérprete simultâneo
que traduz de um idioma para outro em tempo real.
De maneira específica para o fluxo de transformação, a gravação
no lado gravável resulta em novos dados disponibilizados para leitura no
lado legível. Especificamente, qualquer objeto com uma propriedade writable
e uma readable
pode servir
como um fluxo de transformação. No entanto, a classe TransformStream
padrão facilita a criação
de um par que esteja corretamente entrelaçado.
Correntes para tubos
As streams são usadas principalmente para criar um pipe entre elas. Um stream legível pode ser canalizado diretamente
para um stream gravável usando o método pipeTo()
do stream legível ou pode ser canalizado por um
ou mais streams de transformação primeiro usando o método pipeThrough()
do stream legível. Um conjunto de
fluxos agrupados dessa forma é chamado de cadeia de pipe.
Backpressure
Após a construção de uma cadeia de canos, ela propagará sinais sobre a rapidez com que os blocos devem fluir por ela. Se qualquer etapa da cadeia ainda não puder aceitar blocos, ela propagará um sinal para trás pela cadeia de pipe, até que a origem original seja instruída a parar de produzir blocos tão rápidos. Esse processo de normalização do fluxo é chamado de contrapressão.
Teeing
Um fluxo legível pode ser dividido (nomeado com a forma de uma letra maiúscula "T") usando o método tee()
.
Isso trava a transmissão, ou seja, ela não pode mais ser usada diretamente. No entanto, ela cria duas novas
transmissões, chamadas de ramificações, que podem ser consumidas de forma independente.
O Teeing também é importante porque os streams não podem ser retrocedidos ou reiniciados. Falaremos mais sobre isso depois.
A mecânica de um fluxo legível
Um stream legível é uma fonte de dados representada em JavaScript por um objeto
ReadableStream
que
flui de uma fonte subjacente. O construtor
ReadableStream()
cria e retorna um objeto de stream legível dos manipuladores fornecidos. Há dois
tipos de origem:
- As origens push enviam dados constantemente quando você os acessa, e cabe a você iniciar, pausar ou cancelar o acesso ao stream. Exemplos incluem transmissões de vídeo ao vivo, eventos enviados pelo servidor ou WebSockets.
- As origens pull exigem que você solicite dados explicitamente a elas depois de conectadas. Os exemplos
incluem operações HTTP via chamadas
fetch()
ouXMLHttpRequest
.
Os dados do stream são lidos sequencialmente em pequenos pedaços chamados de chunks. Os blocos colocados em um stream são enfileirados. Isso significa que elas estão aguardando em uma fila pronta para leitura. Uma fila interna monitora os blocos que ainda não foram lidos.
Uma estratégia de fila é um objeto que determina como um stream precisa sinalizar a backpressure com base no estado da fila interna. A estratégia de enfileiramento atribui um tamanho a cada bloco e compara o tamanho total de todos os blocos da fila com um número especificado, conhecido como marca alta.
Os blocos dentro do stream são lidos por um leitor. Esse leitor recupera os dados um bloco por vez, permitindo que você faça qualquer tipo de operação. O leitor e o outro código de processamento associado a ele são chamados de consumidor.
O próximo constructo neste contexto é chamado de controlador. Cada stream legível tem um controlador associado que, como o nome sugere, permite controlar o stream.
Apenas um leitor pode ler um stream por vez. Quando um leitor é criado e começa a ler um stream, ou seja, se torna um leitor ativo, ele é bloqueado. Se você quiser que outro leitor assuma a leitura da sua transmissão, normalmente é necessário liberar o primeiro leitor antes de fazer qualquer outra coisa (embora você possa fazer um Tee nas transmissões).
Como criar um stream legível
Para criar um fluxo legível, chame o construtor
ReadableStream()
.
O construtor tem um argumento opcional underlyingSource
, que representa um objeto
com métodos e propriedades que definem como a instância de stream construída vai se comportar.
O underlyingSource
Isso pode usar os seguintes métodos opcionais definidos pelo desenvolvedor:
start(controller)
: chamado imediatamente quando o objeto é construído. O método pode acessar a origem do fluxo e fazer tudo o que for necessário para configurar a funcionalidade do fluxo. Se esse processo for feito de forma assíncrona, o método poderá retornar uma promessa para sinalizar sucesso ou falha. O parâmetrocontroller
transmitido para esse método é umReadableStreamDefaultController
.pull(controller)
: pode ser usado para controlar o fluxo à medida que mais blocos são buscados. Ele é chamado repetidamente, desde que a fila interna de blocos do stream não esteja cheia, até que a fila alcance a marca d'água máxima. Se o resultado da chamadapull()
for uma promessa,pull()
não será chamada novamente até que a promessa seja cumprida. Se a promessa for rejeitada, o stream vai apresentar um erro.cancel(reason)
: chamado quando o consumidor de stream cancela o stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
O ReadableStreamDefaultController
oferece suporte aos seguintes métodos:
ReadableStreamDefaultController.close()
fecha o stream associado.ReadableStreamDefaultController.enqueue()
enfileira um determinado fragmento no stream associado.ReadableStreamDefaultController.error()
causa um erro em todas as interações futuras com o stream associado.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
O queuingStrategy
O segundo argumento, também opcional, do construtor ReadableStream()
é queuingStrategy
.
É um objeto que define, opcionalmente, uma estratégia de enfileiramento para o stream, que recebe dois
parâmetros:
highWaterMark
: um número não negativo que indica o nível máximo do stream que usa essa estratégia de fila.size(chunk)
: uma função que calcula e retorna o tamanho finito não negativo do valor do fragmento fornecido. O resultado é usado para determinar a contrapressão, que é manifestada pela propriedadeReadableStreamDefaultController.desiredSize
adequada. Ele também governa quando o métodopull()
da origem subjacente é chamado.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Os métodos getReader()
e read()
Para ler um stream legível, você precisa de um leitor, que será um
ReadableStreamDefaultReader
.
O método getReader()
da interface ReadableStream
cria um leitor e bloqueia o stream
nele. Enquanto o stream estiver bloqueado, nenhum outro leitor poderá ser adquirido até que este seja liberado.
O método read()
da interface ReadableStreamDefaultReader
retorna uma promessa que fornece acesso ao próximo
bloco na fila interna do stream. Ele atende ou rejeita com um resultado dependendo do estado do
fluxo. As diferentes possibilidades são as seguintes:
- Se um bloco estiver disponível, a promessa será cumprida com um objeto do formulário
{ value: chunk, done: false }
. - Se o fluxo for fechado, a promessa será cumprida com um objeto do formulário
{ value: undefined, done: true }
. - Se o stream apresentar um erro, a promessa será rejeitada com o erro relevante.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
A propriedade locked
É possível verificar se um stream legível está bloqueado acessando a
propriedade
ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemplos de código de fluxo legível
O exemplo de código abaixo mostra todas as etapas em ação. Primeiro, crie um ReadableStream
que, no
argumento underlyingSource
(ou seja, a classe TimestampSource
), defina um método start()
.
Esse método instrui o controller
do fluxo a
enqueue()
um carimbo de data/hora a cada segundo durante dez segundos.
Por fim, ele instrui o controlador a usar close()
no stream. Para consumir esse
stream, crie um leitor usando o método getReader()
e chame read()
até que o stream seja
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteração assíncrona
Verificar cada iteração de loop read()
se o stream é done
pode não ser a API mais conveniente.
Felizmente, em breve haverá uma maneira melhor de fazer isso: iteração assíncrona.
for await (const chunk of stream) {
console.log(chunk);
}
Uma solução alternativa para usar a iteração assíncrona hoje é implementar o comportamento com um polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Como estabelecer um stream legível
O método tee()
da
interface ReadableStream
conecta o fluxo legível atual, retornando uma matriz de dois elementos
que contém as duas ramificações resultantes como novas instâncias de ReadableStream
. Isso permite
que dois leitores leiam um stream simultaneamente. Você pode fazer isso, por exemplo, em um service worker se
quiser buscar uma resposta do servidor e transmiti-la para o navegador, mas também transmiti-la para o
cache do service worker. Como um corpo de resposta não pode ser consumido mais de uma vez, são necessárias duas cópias
para isso. Para cancelar o fluxo, é necessário cancelar as duas ramificações resultantes. A divisão de um fluxo
geralmente o bloqueia durante a duração, impedindo que outros leitores o bloqueiem.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Streams de bytes legíveis
Para streams que representam bytes, uma versão estendida do stream legível é fornecida para processar bytes de maneira eficiente, minimizando as cópias. Os streams de bytes permitem que leitores traga seu próprio buffer (BYOB, na sigla em inglês) sejam adquiridos. A implementação padrão pode fornecer uma variedade de saídas diferentes, como strings ou buffers de matriz no caso de WebSockets, enquanto os streams de bytes garantem a saída de bytes. Além disso, os leitores BYOB têm benefícios de estabilidade. Isso ocorre porque, se um buffer for desconectado, ele poderá garantir que um não seja gravado no mesmo buffer duas vezes, evitando condições de corrida. Os leitores de BYOB podem reduzir o número de vezes que o navegador precisa executar a coleta de lixo, porque ele pode reutilizar buffers.
Como criar um fluxo de bytes legível
É possível criar um fluxo de bytes legível transmitindo um parâmetro type
adicional para o
construtor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
O underlyingSource
A origem de um fluxo de bytes legível recebe um ReadableByteStreamController
para
manipulação. O método ReadableByteStreamController.enqueue()
recebe um argumento chunk
cujo valor
é um ArrayBufferView
. A propriedade ReadableByteStreamController.byobRequest
retorna a solicitação de envio BYOB atual ou nula se não houver nenhuma. Por fim, a propriedade ReadableByteStreamController.desiredSize
retorna o tamanho desejado para preencher a fila interna do stream controlado.
O queuingStrategy
O segundo argumento, também opcional, do construtor ReadableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa um
parâmetro:
highWaterMark
: um número não negativo de bytes que indica a marca d'água alta do fluxo usando essa estratégia de enfileiramento. Isso é usado para determinar a contrapressão, que é manifestada pela propriedadeReadableByteStreamController.desiredSize
adequada. Ele também controla quando o métodopull()
da fonte subjacente é chamado.
Os métodos getReader()
e read()
Em seguida, é possível acessar um ReadableStreamBYOBReader
definindo o parâmetro mode
:
ReadableStream.getReader({ mode: "byob" })
. Isso permite um controle mais preciso sobre a alocação de buffer
para evitar cópias. Para ler do stream de bytes, é necessário chamar
ReadableStreamBYOBReader.read(view)
, em que view
é um
ArrayBufferView
.
Exemplo de código de stream de bytes legível
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
A função a seguir retorna streams de bytes legíveis que permitem a leitura eficiente sem cópia de uma matriz gerada aleatoriamente. Em vez de usar um tamanho de bloco predeterminado de 1.024, ele tenta preencher o buffer fornecido pelo desenvolvedor, permitindo o controle total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
A mecânica de um stream gravável
Um stream gravável é um destino em que você pode gravar dados, representados em JavaScript por um
objeto WritableStream
. Isso
serve como uma abstração sobre a parte superior de um sink subjacente, um sink de E/S de nível inferior em que
os dados brutos são gravados.
Os dados são gravados no fluxo por um gravador, um bloco por vez. Um bloco pode assumir várias formas, assim como os blocos em um leitor. Você pode usar qualquer código para produzir os blocos prontos para gravação. O escritor e o código associado são chamados de produtor.
Quando um escritor é criado e começa a gravar em um stream (um escritor ativo), ele é bloqueado. Apenas um escritor pode gravar em um fluxo gravável por vez. Se você quiser que outro escritor comece a gravar no stream, normalmente é necessário liberar o stream antes de anexar outro escritor a ele.
Uma fila interna rastreia os blocos que foram gravados no stream, mas ainda não foram processados pelo coletor subjacente.
Uma estratégia de fila é um objeto que determina como um stream precisa sinalizar a backpressure com base no estado da fila interna. A estratégia de fila atribui um tamanho a cada bloco e compara o tamanho total de todos os blocos na fila com um número especificado, conhecido como limite máximo.
O constructo final é chamado de controlador. Cada stream gravável tem um controlador associado que permite controlar o stream, por exemplo, para abortá-lo.
Como criar um stream gravável
A interface WritableStream
da
API Streams fornece uma abstração padrão para gravar dados de streaming em um destino, conhecido
como coletor. Esse objeto vem com pressão de retorno e enfileiramento integrados. Para criar um stream gravável,
chame o construtor
WritableStream()
.
Ele tem um parâmetro underlyingSink
opcional, que representa um objeto
com métodos e propriedades que definem como a instância de stream construída vai se comportar.
O underlyingSink
O underlyingSink
pode incluir os seguintes métodos opcionais definidos pelo desenvolvedor. O parâmetro controller
transmitido para alguns dos métodos é um
WritableStreamDefaultController
.
start(controller)
: este método é chamado imediatamente quando o objeto é construído. O conteúdo desse método precisa ter como objetivo acessar o sink subjacente. Se esse processo for feito de forma assíncrona, ele poderá retornar uma promessa para indicar sucesso ou falha.write(chunk, controller)
: esse método será chamado quando um novo bloco de dados (especificado no parâmetrochunk
) estiver pronto para ser gravado no sink subjacente. Ele pode retornar uma promessa para sinalizar o sucesso ou a falha da operação de gravação. Esse método só será chamado depois que as gravações anteriores tiverem sido concluídas e nunca depois que o stream for fechado ou abortado.close(controller)
: esse método será chamado se o app sinalizar que terminou de gravar blocos no stream. O conteúdo precisa fazer o que for necessário para finalizar as gravações no coletor e liberar o acesso a ele. Se esse processo for assíncrono, ele poderá retornar uma promessa para indicar sucesso ou falha. Esse método só será chamado depois que todas as gravações em fila tiverem sido concluídas.abort(reason)
: esse método será chamado se o app sinalizar que quer fechar abruptamente a transmissão e colocá-la em um estado de erro. Ele pode limpar todos os recursos retidos, comoclose()
, masabort()
será chamado mesmo que as gravações estejam na fila. Esses pedaços serão descartados. Se esse processo for assíncrono, ele poderá retornar uma promessa para indicar sucesso ou falha. O parâmetroreason
contém umaDOMString
que descreve por que o fluxo foi interrompido.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
A interface
WritableStreamDefaultController
da API Streams representa um controlador que permite o controle do estado de um WritableStream
durante a configuração, à medida que mais blocos são enviados para gravação ou no final da gravação. Ao criar
um WritableStream
, o sink subjacente recebe uma instância WritableStreamDefaultController
correspondente para manipular. O WritableStreamDefaultController
tem apenas um método:
WritableStreamDefaultController.error()
,
que faz com que todas as interações futuras com o fluxo associado gerem erros.
WritableStreamDefaultController
também oferece suporte a uma propriedade signal
, que retorna uma instância de
AbortSignal
,
permitindo que uma operação WritableStream
seja interrompida, se necessário.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
O queuingStrategy
O segundo argumento, também opcional, do construtor WritableStream()
é queuingStrategy
.
É um objeto que define, opcionalmente, uma estratégia de enfileiramento para o stream, que recebe dois
parâmetros:
highWaterMark
: um número não negativo que indica o nível máximo do stream que usa essa estratégia de fila.size(chunk)
: uma função que calcula e retorna o tamanho finito não negativo do valor do fragmento fornecido. O resultado é usado para determinar a contrapressão, que é manifestada pela propriedadeWritableStreamDefaultWriter.desiredSize
adequada.
Os métodos getWriter()
e write()
Para gravar em um fluxo gravável, você precisa de um escritor, que será um
WritableStreamDefaultWriter
. O método getWriter()
da interface WritableStream
retorna uma
nova instância de WritableStreamDefaultWriter
e bloqueia o stream para essa instância. Enquanto o
stream estiver bloqueado, nenhum outro escritor poderá ser adquirido até que o atual seja liberado.
O método write()
da interface
WritableStreamDefaultWriter
grava um bloco de dados transmitido para um WritableStream
e o sink subjacente, depois retorna
uma promessa que resolve para indicar o sucesso ou a falha da operação de gravação. O que
"sucesso" significa é até a sink subjacente. Ele pode indicar que o bloco foi aceito,
mas não necessariamente que ele foi salvo com segurança no destino final.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
A propriedade locked
É possível verificar se um stream gravável está bloqueado acessando a
propriedade
WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemplo de código de fluxo gravável
O exemplo de código abaixo mostra todas as etapas em ação.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Como encaminhar um fluxo legível para um fluxo gravável
Um fluxo legível pode ser canalizado para um fluxo gravável pelo método
pipeTo()
do fluxo legível.
O ReadableStream.pipeTo()
encaminha o ReadableStream
atual para uma determinada WritableStream
e retorna uma
promessa que será atendida quando o processo de piping for concluído com sucesso ou será rejeitada se houver algum
erro.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Como criar um stream de transformação
A interface TransformStream
da API Streams representa um conjunto de dados transformáveis. Para criar um fluxo de transformação, chame o construtor TransformStream()
, que cria e retorna um objeto de fluxo de transformação dos gerenciadores fornecidos. O construtor TransformStream()
aceita como
primeiro argumento um objeto JavaScript opcional que representa o transformer
. Esses objetos podem conter qualquer um dos seguintes métodos:
O transformer
start(controller)
: esse método é chamado imediatamente quando o objeto é criado. Normalmente, isso é usado para enfileirar blocos de prefixo usandocontroller.enqueue()
. Esses blocos serão lidos do lado legível, mas não dependem de gravações no lado gravável. Se esse processo inicial for assíncrono, por exemplo, porque exige algum esforço para adquirir os blocos de prefixo, a função poderá retornar uma promessa para sinalizar sucesso ou falha. Uma promessa recusada vai gerar um erro no stream. Todas as exceções geradas serão geradas novamente pelo construtorTransformStream()
.transform(chunk, controller)
: esse método é chamado quando um novo fragmento originalmente gravado no lado gravável está pronto para ser transformado. A implementação do stream garante que essa função seja chamada somente depois que as transformações anteriores tenham sido concluídas e nunca antes destart()
ser concluído ou depois queflush()
for chamado. Essa função executa o trabalho de transformação real do fluxo de transformação. Ele pode enfileirar os resultados usandocontroller.enqueue()
. Isso permite que um único bloco gravado no lado gravável resulte em zero ou vários blocos no lado legível, dependendo de quantas vezescontroller.enqueue()
é chamado. Se o processo de transformação for assíncrono, essa função poderá retornar uma promessa para indicar o sucesso ou a falha da transformação. Uma promessa rejeitada vai gerar erros nos lados legível e gravável do stream de transformação. Se nenhum métodotransform()
for fornecido, a transformação de identidade será usada, o que enfileira os blocos inalterados do lado gravável para o lado legível.flush(controller)
: esse método é chamado depois que todos os blocos gravados no lado gravável são transformados pela passagem com sucesso portransform()
e o lado gravável está prestes a ser fechado. Normalmente, isso é usado para enfileirar blocos de sufixo no lado legível, antes que ele também seja fechado. Se o processo de limpeza for assíncrono, a função poderá retornar uma promessa para indicar sucesso ou falha. O resultado será comunicado ao autor da chamada destream.writable.write()
. Além disso, uma promessa rejeitada vai gerar erros nos lados legível e gravável do stream. Gerar uma exceção é tratado da mesma forma que retornar uma promessa rejeitada.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
As estratégias de fila writableStrategy
e readableStrategy
O segundo e o terceiro parâmetros opcionais do construtor TransformStream()
são estratégias de enfileiramento opcionais
writableStrategy
e readableStrategy
. Elas são definidas conforme descrito nas seções
legíveis e graváveis
respectivamente.
Exemplo de código de transformação de fluxo
O exemplo de código a seguir mostra um fluxo de transformação simples em ação.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Como colocar um stream legível em um stream de transformação
O método pipeThrough()
da interface ReadableStream
oferece uma maneira encadeável de canalizar o fluxo atual
por um fluxo de transformação ou qualquer outro par gravável/legível. O encaminhamento de um stream geralmente o bloqueia
durante a duração do encaminhamento, impedindo que outros leitores o bloqueiem.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
O próximo exemplo de código (um pouco inventado) mostra como implementar uma versão "gritada" de fetch()
que coloca todo o texto em letra maiúscula consumindo a promessa de resposta retornada
como um stream
e colocando cada fragmento em letras maiúsculas. A vantagem dessa abordagem é que você não precisa esperar o download de todo o documento, o que pode fazer uma enorme diferença ao lidar com arquivos grandes.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demonstração
A demonstração abaixo mostra streams legíveis, graváveis e de transformação em ação. Ele também inclui exemplos
de cadeias de pipe pipeThrough()
e pipeTo()
e demonstra tee()
. Você pode executar a
demonstração na própria janela ou conferir o
código-fonte.
Streams úteis disponíveis no navegador
Há vários fluxos úteis integrados ao navegador. É possível criar facilmente um
ReadableStream
a partir de um blob. O método stream() da interface Blob
retorna um ReadableStream
que, após a leitura, retorna os dados contidos no blob. Lembre-se também de que um objeto File
é um tipo específico de Blob
e pode ser usado em qualquer contexto em que um blob pode.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
As variantes de streaming de TextDecoder.decode()
e TextEncoder.encode()
são chamadas de
TextDecoderStream
e
TextEncoderStream
, respectivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
É fácil compactar ou descompactar um arquivo com os fluxos de transformação
CompressionStream
e
DecompressionStream
,
respectivamente. O exemplo de código abaixo mostra como fazer o download da especificação Streams, compactá-la (gzip)
diretamente no navegador e gravar o arquivo compactado diretamente no disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
A FileSystemWritableFileStream
da
API File System Access
e os fluxos de solicitação fetch()
experimentais são
exemplos de fluxos graváveis em uso.
A API Serial usa muito os streams legíveis e graváveis.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Por fim, a API WebSocketStream
integra streams à API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Recursos úteis
- Especificação de streams
- Demonstrações complementares
- Polyfill de streams
- 2016: o ano dos streams da Web
- Geradores e iteradores assíncronos
- Visualizador de stream
Agradecimentos
Este artigo foi revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. As postagens do blog do Jake Archibald me ajudaram muito a entender as transmissões. Alguns dos exemplos de código são inspirados nas análises do usuário do GitHub @bellbind e partes do texto são baseadas nos documentos da Web do MDN sobre streams. Os autores do Streams Standard fizeram um trabalho incrível ao escrever esta especificação. Imagem principal de Ryan Lara no Unsplash.