Pelajari cara menggunakan streaming yang dapat dibaca, ditulis, dan ditransformasi dengan Streams API.
Streams API memungkinkan Anda mengakses aliran data yang diterima melalui jaringan secara terprogram atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming melibatkan pemecahan resource yang ingin Anda terima, kirim, atau ubah
menjadi potongan kecil, lalu memproses potongan ini sedikit demi sedikit. Meskipun streaming adalah hal yang
dilakukan browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan
ini belum pernah tersedia untuk JavaScript sebelum fetch
dengan streaming diperkenalkan pada tahun 2015.
Sebelumnya, jika Anda ingin memproses jenis resource tertentu (baik video, file teks, dll.), Anda harus mendownload seluruh file, menunggu hingga dilakukan deserialisasi ke dalam format yang sesuai, lalu memprosesnya. Dengan tersedianya streaming untuk JavaScript, semua ini akan berubah. Anda kini dapat memproses data mentah dengan JavaScript secara bertahap begitu data tersedia di klien, tanpa perlu membuat buffering, string, atau blob. Hal ini membuka sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah:
- Efek video: menyisipkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real time.
- (De)kompresi data: menyalurkan aliran file melalui aliran transformasi yang secara selektif (mendekompresi) data.
- Dekode gambar: menyisipkan aliran respons HTTP melalui aliran transformasi yang mendekode byte
menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika
diinstal di dalam pengendali
fetch
pekerja layanan, hal ini memungkinkan Anda melakukan polyfill format gambar baru seperti AVIF secara transparan.
Dukungan browser
ReadableStream dan WritableStream
TransformStream
Konsep inti
Sebelum membahas berbagai jenis streaming secara mendetail, izinkan saya memperkenalkan beberapa konsep inti.
Potongan
Bagian adalah satu bagian data yang ditulis ke atau dibaca dari aliran data. Jenisnya dapat berupa
apa saja; streaming bahkan dapat berisi potongan dari berbagai jenis. Sering kali, bagian tidak akan menjadi unit data
yang paling atomik untuk aliran tertentu. Misalnya, aliran byte mungkin berisi potongan yang terdiri dari 16 unit Uint8Array
kibibyte, bukan byte tunggal.
Aliran data yang dapat dibaca
Aliran data yang dapat dibaca mewakili sumber data yang dapat Anda baca. Dengan kata lain, data keluar
dari aliran data yang dapat dibaca. Secara konkret, aliran yang dapat dibaca adalah instance class
ReadableStream
.
Aliran data yang dapat ditulis
Aliran data yang dapat ditulis mewakili tujuan untuk data yang dapat Anda tulis. Dengan kata lain, data
masuk ke aliran yang dapat ditulis. Secara konkret, aliran yang dapat ditulis adalah instance dari
class WritableStream
.
Mentransformasi aliran data
Streaming transformasi terdiri dari sepasang streaming: streaming yang dapat ditulis, dikenal sebagai sisi yang dapat ditulis, dan streaming yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca.
Metafora dunia nyata untuk hal ini adalah
penerjemah simultan
yang menerjemahkan dari satu bahasa ke bahasa lain dengan cepat.
Dengan cara khusus untuk aliran transformasi, penulisan
ke sisi yang dapat ditulis akan menghasilkan data baru yang tersedia untuk dibaca dari
sisi yang dapat dibaca. Secara konkret, setiap objek dengan properti writable
dan properti readable
dapat berfungsi
sebagai aliran transformasi. Namun, class TransformStream
standar memudahkan pembuatan
pasangan yang dijejali dengan benar.
Rantai pipa
Streaming terutama digunakan dengan menggabungkan satu sama lain. Streaming yang dapat dibaca dapat disalurkan langsung
ke streaming yang dapat ditulis, menggunakan metode pipeTo()
streaming yang dapat dibaca, atau dapat disalurkan melalui satu
atau beberapa streaming transformasi terlebih dahulu, menggunakan metode pipeThrough()
streaming yang dapat dibaca. Serangkaian
aliran yang di-pipe bersama dengan cara ini disebut sebagai rantai pipa.
Tekanan balik
Setelah dibuat, rantai pipa akan menyebarkan sinyal terkait seberapa cepat potongan harus mengalir melaluinya. Jika ada langkah dalam rantai yang belum dapat menerima potongan, langkah tersebut akan menyebarkan sinyal ke belakang melalui rantai pipa, hingga akhirnya sumber asli diberi tahu untuk berhenti menghasilkan potongan begitu cepat. Proses menormalisasi alur ini disebut backpressure.
Teeing
Aliran data yang dapat dibaca dapat diberi judul (dinamai berdasarkan bentuk huruf besar 'T') menggunakan metode tee()
.
Tindakan ini akan mengunci streaming, yaitu membuatnya tidak dapat digunakan secara langsung; namun, tindakan ini akan membuat dua streaming
baru, yang disebut cabang, yang dapat digunakan secara independen.
Memulai streaming juga penting karena streaming tidak dapat diputar ulang atau dimulai ulang. Nanti kita akan membahasnya lebih lanjut.
Mekanisme streaming yang dapat dibaca
Aliran data yang dapat dibaca adalah sumber data yang direpresentasikan dalam JavaScript oleh
objek ReadableStream
yang
mengalir dari sumber pokok. Konstruktor
ReadableStream()
membuat dan menampilkan objek aliran yang dapat dibaca dari pengendali tertentu. Ada dua
jenis sumber pokok:
- Sumber push terus mendorong data kepada Anda saat Anda mengaksesnya, dan Anda dapat memulai, menjeda, atau membatalkan akses ke aliran data. Contohnya mencakup streaming video live, peristiwa yang dikirim server, atau WebSocket.
- Sumber pull mengharuskan Anda meminta data secara eksplisit dari sumber tersebut setelah terhubung. Contohnya
meliputi operasi HTTP melalui panggilan
fetch()
atauXMLHttpRequest
.
Data streaming dibaca secara berurutan dalam potongan kecil yang disebut chunk. Potongan yang ditempatkan dalam streaming disebut diantrekan. Artinya, data tersebut menunggu dalam antrean yang siap dibaca. Antrean internal melacak bagian yang belum dibaca.
Strategi antrean adalah objek yang menentukan cara streaming menandakan backpressure berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan jumlah total semua bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai high water mark.
Potongan di dalam aliran dibaca oleh pembaca. Pembaca ini mengambil data satu bagian sekaligus, sehingga Anda dapat melakukan jenis operasi apa pun yang ingin Anda lakukan. Pembaca beserta kode pemrosesan lainnya yang menyertainya disebut konsumen.
Konstruksi berikutnya dalam konteks ini disebut pengontrol. Setiap streaming yang dapat dibaca memiliki pengontrol terkait yang, seperti namanya, memungkinkan Anda mengontrol streaming.
Hanya satu pembaca yang dapat membaca streaming dalam satu waktu; saat pembaca dibuat dan mulai membaca streaming (yaitu, menjadi pembaca aktif), pembaca akan dikunci ke streaming tersebut. Jika ingin pembaca lain mengambil alih pembacaan streaming Anda, biasanya Anda perlu merilis pembaca pertama sebelum melakukan hal lain (meskipun Anda dapat melakukan streaming).
Membuat streaming yang dapat dibaca
Anda membuat streaming yang dapat dibaca dengan memanggil konstruktornya
ReadableStream()
.
Konstruktor memiliki argumen opsional underlyingSource
, yang mewakili objek
dengan metode dan properti yang menentukan perilaku instance streaming yang dibuat.
underlyingSource
Hal ini dapat menggunakan metode opsional yang ditentukan developer berikut:
start(controller)
: Segera dipanggil saat objek dibuat. Metode ini dapat mengakses sumber aliran data, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi aliran data. Jika proses ini dilakukan secara asinkron, metode ini dapat menampilkan janji untuk menandakan keberhasilan atau kegagalan. Parametercontroller
yang diteruskan ke metode ini adalahReadableStreamDefaultController
.pull(controller)
: Dapat digunakan untuk mengontrol streaming saat lebih banyak bagian diambil. Callback ini dipanggil berulang kali selama antrean internal potongan streaming tidak penuh, hingga antrean mencapai batas air yang tinggi. Jika hasil pemanggilanpull()
adalah promise,pull()
tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, streaming akan mengalami error.cancel(reason)
: Dipanggil saat konsumen streaming membatalkan streaming.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
mendukung metode berikut:
ReadableStreamDefaultController.close()
menutup aliran data terkait.ReadableStreamDefaultController.enqueue()
mengantrekan potongan tertentu dalam aliran terkait.ReadableStreamDefaultController.error()
menyebabkan interaksi mendatang dengan aliran data terkait mengalami error.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream()
adalah queuingStrategy
.
Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua
parameter:
highWaterMark
: Angka non-negatif yang menunjukkan nilai maksimum aliran data menggunakan strategi antrean ini.size(chunk)
: Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai bagian yang diberikan. Hasilnya digunakan untuk menentukan backpressure, yang ditampilkan melalui propertiReadableStreamDefaultController.desiredSize
yang sesuai. Hal ini juga mengatur kapan metodepull()
sumber yang mendasarinya dipanggil.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metode getReader()
dan read()
Untuk membaca dari aliran yang dapat dibaca, Anda memerlukan pembaca, yang akan berupa
ReadableStreamDefaultReader
.
Metode getReader()
antarmuka ReadableStream
membuat pembaca dan mengunci streaming ke
antarmuka tersebut. Saat streaming terkunci, tidak ada pembaca lain yang dapat diperoleh hingga pembaca ini dirilis.
Metode read()
antarmuka ReadableStreamDefaultReader
menampilkan promise yang memberikan akses ke bagian
berikutnya dalam antrean internal streaming. Fungsi ini memenuhi atau menolak dengan hasil yang bergantung pada status
streaming. Kemungkinan yang berbeda adalah sebagai berikut:
- Jika bagian tersedia, promise akan terpenuhi dengan objek dalam bentuk
{ value: chunk, done: false }
. - Jika streaming ditutup, promise akan terpenuhi dengan objek dalam bentuk
{ value: undefined, done: true }
. - Jika streaming mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Properti locked
Anda dapat memeriksa apakah aliran data yang dapat dibaca terkunci dengan mengakses properti
ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Contoh kode streaming yang dapat dibaca
Contoh kode di bawah menunjukkan penerapan semua langkah. Pertama-tama, Anda membuat ReadableStream
yang dalam
argumen underlyingSource
-nya (yaitu class TimestampSource
) menentukan metode start()
.
Metode ini memberi tahu controller
streaming ke
enqueue()
stempel waktu setiap detik selama sepuluh detik.
Terakhir, ini memberi tahu pengontrol untuk melakukan close()
aliran. Anda menggunakan streaming
ini dengan membuat pembaca melalui metode getReader()
dan memanggil read()
hingga streaming
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterasi asinkron
Memeriksa setiap iterasi loop read()
jika streamingnya adalah done
mungkin bukan API yang paling praktis.
Untungnya, akan segera ada cara yang lebih baik untuk melakukannya: iterasi asinkron.
for await (const chunk of stream) {
console.log(chunk);
}
Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Membuat streaming yang dapat dibaca
Metode tee()
dari
antarmuka ReadableStream
menghubungkan aliran yang dapat dibaca saat ini, yang menampilkan array dua elemen
yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream
baru. Hal ini memungkinkan
dua pembaca membaca streaming secara bersamaan. Anda dapat melakukannya, misalnya, di pekerja layanan jika ingin mengambil respons dari server dan men-streamingnya ke browser, sekaligus melakukan streaming ke cache pekerja layanan. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan
untuk melakukannya. Untuk membatalkan streaming, Anda harus membatalkan kedua cabang yang dihasilkan. Memulai streaming
umumnya akan menguncinya selama durasi, sehingga mencegah pembaca lain menguncinya.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Aliran byte yang dapat dibaca
Untuk streaming yang mewakili byte, versi tambahan dari streaming yang dapat dibaca akan disediakan untuk menangani byte secara efisien, khususnya dengan meminimalkan salinan. Byte stream memungkinkan pembaca buffer bawa sendiri (BYOB) diperoleh. Implementasi default dapat memberikan berbagai output seperti string atau buffer array dalam kasus WebSocket, sedangkan byte stream menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Hal ini karena jika buffer dilepas, buffer tersebut dapat menjamin bahwa buffer tidak ditulis dua kali, sehingga menghindari kondisi race. Pembaca BYOB dapat mengurangi frekuensi browser perlu menjalankan pembersihan sampah memori, karena dapat menggunakan kembali buffering.
Membuat aliran byte yang dapat dibaca
Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type
tambahan ke
konstruktor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Sumber pokok aliran byte yang dapat dibaca diberi ReadableByteStreamController
untuk
dimanipulasi. Metode ReadableByteStreamController.enqueue()
-nya menggunakan argumen chunk
yang nilainya
merupakan ArrayBufferView
. Properti ReadableByteStreamController.byobRequest
menampilkan permintaan pull BYOB
saat ini, atau null jika tidak ada. Terakhir, properti ReadableByteStreamController.desiredSize
menampilkan ukuran yang diinginkan untuk mengisi antrean internal streaming yang dikontrol.
queuingStrategy
Argumen kedua, yang juga opsional, dari konstruktor ReadableStream()
adalah queuingStrategy
.
Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang menggunakan satu
parameter:
highWaterMark
: Jumlah byte non-negatif yang menunjukkan tanda air yang tinggi pada streaming menggunakan strategi antrean ini. Ini digunakan untuk menentukan backpressure, yang dimanifes melalui propertiReadableByteStreamController.desiredSize
yang sesuai. Ini juga mengatur kapan metodepull()
sumber yang mendasarinya dipanggil.
Metode getReader()
dan read()
Anda kemudian bisa mendapatkan akses ke ReadableStreamBYOBReader
dengan menetapkan parameter mode
yang sesuai:
ReadableStream.getReader({ mode: "byob" })
. Hal ini memungkinkan kontrol yang lebih akurat atas alokasi buffer
untuk menghindari salinan. Untuk membaca dari aliran byte, Anda perlu memanggil
ReadableStreamBYOBReader.read(view)
, dengan view
adalah
ArrayBufferView
.
Contoh kode byte stream yang dapat dibaca
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Fungsi berikut menampilkan aliran byte yang dapat dibaca, yang memungkinkan pembacaan nol salinan yang efisien dari array yang dihasilkan secara acak. Alih-alih menggunakan ukuran bagian yang telah ditentukan sebelumnya sebesar 1.024, buffer ini mencoba mengisi buffer yang disediakan developer, sehingga memungkinkan kontrol penuh.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mekanisme streaming yang dapat ditulis
Aliran data yang dapat ditulis adalah tujuan tempat Anda dapat menulis data, yang direpresentasikan dalam JavaScript oleh objek
WritableStream
. Hal ini
berfungsi sebagai abstraksi di atas sink yang mendasari—sink I/O tingkat rendah tempat
data mentah ditulis.
Data ditulis ke aliran melalui penulis, satu bagian dalam satu waktu. Sebuah potongan dapat memiliki banyak bentuk, seperti halnya potongan pada pembaca. Anda dapat menggunakan kode apa pun yang Anda sukai untuk menghasilkan bagian yang siap ditulis; penulis ditambah kode terkait disebut produser.
Saat penulis dibuat dan mulai menulis ke streaming (penulis aktif), penulis tersebut dianggap terkunci ke streaming. Hanya satu penulis yang dapat menulis ke stream yang dapat ditulis dalam satu waktu. Jika ingin penulis lain mulai menulis ke streaming, Anda biasanya harus merilisnya, sebelum melampirkan penulis lain ke streaming.
Antrean internal melacak bagian yang telah ditulis ke aliran data, tetapi belum diproses oleh sink yang mendasarinya.
Strategi antrean adalah objek yang menentukan cara streaming harus memberikan sinyal backpressure berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap potongan, dan membandingkan total ukuran semua potongan dalam antrean dengan angka yang ditentukan, yang dikenal sebagai tanda air tinggi.
Konstruksi akhir disebut pengontrol. Setiap aliran data yang dapat ditulis memiliki pengontrol terkait yang memungkinkan Anda mengontrol aliran data (misalnya, untuk membatalkannya).
Membuat streaming yang dapat ditulis
Antarmuka WritableStream
Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal
sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda membuat aliran yang dapat ditulis dengan
memanggil konstruktornya
WritableStream()
.
Class ini memiliki parameter underlyingSink
opsional, yang mewakili sebuah objek
dengan metode dan properti yang menentukan perilaku dari instance aliran data yang telah dibuat.
underlyingSink
underlyingSink
dapat menyertakan metode opsional yang ditentukan developer berikut. Parameter controller
yang diteruskan ke beberapa metode adalah
WritableStreamDefaultController
.
start(controller)
: Metode ini segera dipanggil saat objek dibuat. Konten metode ini harus bertujuan untuk mendapatkan akses ke sink yang mendasarinya. Jika dilakukan secara asinkron, proses ini dapat menampilkan janji untuk menandakan keberhasilan atau kegagalan.write(chunk, controller)
: Metode ini akan dipanggil jika potongan data baru (ditentukan dalam parameterchunk
) siap ditulis ke sink yang mendasarinya. Fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah penulisan sebelumnya berhasil, dan tidak pernah setelah streaming ditutup atau dibatalkan.close(controller)
: Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi telah selesai menulis bagian ke aliran data. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke sink yang mendasarinya, dan melepaskan akses ke sink tersebut. Jika proses ini asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua operasi tulis dalam antrean berhasil.abort(reason)
: Metode ini akan dipanggil jika aplikasi memberikan sinyal bahwa aplikasi ingin menutup aliran data secara tiba-tiba dan menempatkannya dalam status error. Fungsi ini dapat membersihkan resource yang disimpan, seperticlose()
, tetapiabort()
akan dipanggil meskipun operasi tulis diantre. Potongan tersebut akan dibuang. Jika proses ini asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameterreason
berisiDOMString
yang menjelaskan alasan streaming dibatalkan.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Antarmuka
WritableStreamDefaultController
Streams API mewakili pengontrol yang memungkinkan kontrol status WritableStream
selama penyiapan, karena lebih banyak bagian dikirim untuk ditulis, atau di akhir penulisan. Saat membuat
WritableStream
, sink yang mendasarinya diberi instance WritableStreamDefaultController
yang sesuai untuk dimanipulasi. WritableStreamDefaultController
hanya memiliki satu metode:
WritableStreamDefaultController.error()
,
yang menyebabkan interaksi mendatang dengan aliran data terkait mengalami error.
WritableStreamDefaultController
juga mendukung properti signal
yang menampilkan instance
AbortSignal
,
sehingga operasi WritableStream
dapat dihentikan jika diperlukan.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Argumen kedua, yang juga bersifat opsional, dari konstruktor WritableStream()
adalah queuingStrategy
.
Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua
parameter:
highWaterMark
: Angka non-negatif yang menunjukkan nilai maksimum aliran data menggunakan strategi antrean ini.size(chunk)
: Fungsi yang menghitung dan menampilkan ukuran non-negatif yang terbatas dari nilai bagian yang diberikan. Hasilnya digunakan untuk menentukan backpressure, yang ditampilkan melalui propertiWritableStreamDefaultWriter.desiredSize
yang sesuai.
Metode getWriter()
dan write()
Untuk menulis ke aliran yang dapat ditulis, Anda memerlukan penulis, yang akan menjadi
WritableStreamDefaultWriter
. Metode getWriter()
antarmuka WritableStream
menampilkan
instance WritableStreamDefaultWriter
baru dan mengunci streaming ke instance tersebut. Saat
aliran terkunci, tidak ada penulis lain yang dapat diperoleh hingga penulis saat ini dirilis.
Metode write()
antarmuka
WritableStreamDefaultWriter
menulis bagian data yang diteruskan ke WritableStream
dan sink yang mendasarinya, lalu menampilkan
promise yang diselesaikan untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perhatikan bahwa arti
"sukses" bergantung pada sink yang mendasarinya; hal ini mungkin menunjukkan bahwa bagian telah diterima,
dan tidak selalu disimpan dengan aman ke tujuan akhirnya.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Properti locked
Anda dapat memeriksa apakah streaming yang dapat ditulis dikunci dengan mengakses
properti
WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Contoh kode streaming yang dapat ditulis
Contoh kode di bawah menunjukkan semua langkah yang sedang berjalan.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Mengalirkan streaming yang dapat dibaca ke streaming yang dapat ditulis
Stream yang dapat dibaca dapat disalurkan ke stream yang dapat ditulis melalui metode
pipeTo()
stream yang dapat dibaca.
ReadableStream.pipeTo()
menyalurkan ReadableStream
saat ini ke WritableStream
tertentu dan menampilkan promise yang terpenuhi saat proses pemipaan berhasil diselesaikan, atau menolak jika terjadi error.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Membuat streaming transformasi
Antarmuka TransformStream
Streams API mewakili sekumpulan data yang dapat ditransformasi. Anda
membuat aliran transformasi dengan memanggil TransformStream()
konstruktornya, yang membuat dan menampilkan
objek aliran transformasi dari pengendali tertentu. Konstruktor TransformStream()
menerima sebagai argumen pertamanya objek JavaScript opsional yang mewakili transformer
. Objek tersebut dapat
berisi salah satu metode berikut:
transformer
start(controller)
: Metode ini langsung dipanggil saat objek dibuat. Biasanya, hal ini digunakan untuk mengantrekan potongan awalan, menggunakancontroller.enqueue()
. Potongan tersebut akan dibaca dari sisi yang dapat dibaca, tetapi tidak bergantung pada penulisan apa pun ke sisi yang dapat ditulis. Jika proses awal ini bersifat asinkron, misalnya karena perlu beberapa upaya untuk mendapatkan potongan awalan, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan membuat error pada streaming. Setiap pengecualian yang ditampilkan akan ditampilkan kembali oleh konstruktorTransformStream()
.transform(chunk, controller)
: Metode ini dipanggil saat bagian baru yang awalnya ditulis ke sisi yang dapat ditulis siap diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan tidak pernah sebelumstart()
selesai atau setelahflush()
dipanggil. Fungsi ini melakukan pekerjaan transformasi yang sebenarnya dari aliran transformasi. Fungsi ini dapat mengantrekan hasil menggunakancontroller.enqueue()
. Hal ini memungkinkan satu bagian yang ditulis ke sisi yang dapat ditulis menghasilkan nol atau beberapa bagian di sisi yang dapat dibaca, bergantung pada frekuensicontroller.enqueue()
dipanggil. Jika proses transformasi bersifat asinkron, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan transformasi. Promise yang ditolak akan menampilkan error pada sisi streaming transformasi yang dapat dibaca dan ditulis. Jika tidak ada metodetransform()
yang disediakan, transformasi identitas akan digunakan, yang menambahkan chunk ke antrean tanpa perubahan dari sisi yang dapat ditulis ke sisi yang dapat dibaca.flush(controller)
: Metode ini dipanggil setelah semua potongan yang ditulis ke sisi yang dapat ditulis telah ditransformasi dengan berhasil melewatitransform()
, dan sisi yang dapat ditulis akan ditutup. Biasanya, ini digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum ditutup juga. Jika proses penghapusan data bersifat asinkron, fungsi dapat menampilkan promise untuk memberi sinyal keberhasilan atau kegagalan; hasilnya akan disampaikan kepada pemanggilstream.writable.write()
. Selain itu, promise yang ditolak akan menimbulkan error pada sisi streaming yang dapat dibaca dan dapat ditulis. Menampilkan pengecualian diperlakukan sama seperti menampilkan promise yang ditolak.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategi antrean writableStrategy
dan readableStrategy
Parameter opsional kedua dan ketiga dari konstruktor TransformStream()
adalah strategi antrean writableStrategy
dan readableStrategy
opsional. Keduanya ditentukan seperti yang diuraikan di bagian streaming
dapat dibaca dan dapat ditulis.
Contoh kode streaming transformasi
Contoh kode berikut menunjukkan cara kerja streaming transformasi sederhana.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Mengalirkan streaming yang dapat dibaca melalui streaming transformasi
Metode pipeThrough()
antarmuka ReadableStream
menyediakan cara yang dapat dirantai untuk menyalurkan streaming saat ini
melalui streaming transformasi atau pasangan lain yang dapat ditulis/dibaca. Pemipaan aliran data biasanya akan menguncinya selama durasi pemipaan, sehingga mencegah pembaca lain menguncinya.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Contoh kode berikutnya (agak dibuat-buat) menunjukkan cara menerapkan versi fetch()
"berteriak"
yang menggunakan huruf besar untuk semua teks dengan menggunakan promise respons yang ditampilkan
sebagai streaming
dan menggunakan huruf besar untuk setiap bagian. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu
seluruh dokumen didownload, yang dapat membuat perbedaan besar saat menangani file berukuran besar.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Demo di bawah menunjukkan cara kerja streaming yang dapat dibaca, ditulis, dan diubah. Contoh ini juga mencakup contoh
rantai pipa pipeThrough()
dan pipeTo()
, serta menunjukkan tee()
. Anda dapat menjalankan
demo di jendelanya sendiri atau melihat
kode sumber.
Streaming yang berguna tersedia di browser
Ada sejumlah aliran data berguna yang terintegrasi langsung ke dalam browser. Anda dapat dengan mudah membuat
ReadableStream
dari blob. Metode stream() antarmuka
Blob
menampilkan
ReadableStream
yang setelah dibaca akan menampilkan data yang terdapat dalam blob. Ingat juga bahwa objek
File
adalah jenis khusus dari
Blob
, dan dapat digunakan dalam konteks apa pun yang dapat digunakan blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Varian streaming TextDecoder.decode()
dan TextEncoder.encode()
masing-masing disebut
TextDecoderStream
dan
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Mengompresi atau mendekompresi file menjadi mudah dengan
aliran transformasi
CompressionStream
dan
DecompressionStream
. Contoh kode di bawah menunjukkan cara mendownload spesifikasi Streams, mengompresi (gzip)
langsung di browser, dan menulis file yang dikompresi langsung ke disk.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
dan aliran permintaan fetch()
eksperimental File System Access API
adalah
contoh streaming yang dapat ditulis pada umumnya.
Serial API banyak menggunakan streaming yang dapat dibaca dan ditulis.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Terakhir, WebSocketStream
API mengintegrasikan streaming dengan WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Referensi yang berguna
- Spesifikasi aliran data
- Demo pengiring
- Polis streaming
- 2016—tahun aliran data web
- Iterator dan generator asinkron
- Visualizer Aliran Data
Ucapan terima kasih
Artikel ini ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Adam Rice. Postingan blog Jake Archibald sangat membantu saya dalam memahami aliran data. Beberapa contoh kode terinspirasi oleh eksplorasi pengguna GitHub @bellbind dan bagian prosa yang sangat bergantung pada MDN Web Docs on Streams. Penulis Streams Standard telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Banner besar oleh Ryan Lara di Unsplash.