ดูวิธีใช้สตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบด้วย Streams API
Streams API ช่วยให้คุณเข้าถึงสตรีมข้อมูลที่รับผ่านเครือข่ายหรือสร้างขึ้นด้วยวิธีใดก็ได้ในเครื่องแบบเป็นโปรแกรม และประมวลผลด้วย JavaScript สตรีมมิงเกี่ยวข้องกับการแบ่งทรัพยากรที่คุณต้องการรับ ส่ง หรือเปลี่ยนรูปแบบเป็นกลุ่มเล็กๆ แล้วประมวลผลกลุ่มเหล่านี้ทีละส่วน แม้ว่าการสตรีมเป็นสิ่งที่เบราว์เซอร์ทำอยู่แล้วเมื่อได้รับชิ้นงาน เช่น HTML หรือวิดีโอเพื่อแสดงในหน้าเว็บ แต่ JavaScript ไม่เคยมีความสามารถนี้มาก่อนจนกระทั่งมีการเปิดตัว fetch
ที่มีสตรีมในปี 2015
ก่อนหน้านี้ หากต้องการประมวลผลทรัพยากรบางประเภท (ไม่ว่าจะเป็นวิดีโอหรือไฟล์ข้อความ ฯลฯ) คุณจะต้องดาวน์โหลดไฟล์ทั้งไฟล์ รอให้ไฟล์ได้รับการแปลงเป็นรูปแบบที่เหมาะสม แล้วจึงประมวลผล เมื่อสตรีมพร้อมใช้งานสำหรับ JavaScript ทุกอย่างก็เปลี่ยนไป ตอนนี้คุณสามารถประมวลผลข้อมูลดิบด้วย JavaScript แบบเป็นขั้นเป็นตอนได้ทันทีที่ข้อมูลพร้อมใช้งานบนไคลเอ็นต์ โดยไม่ต้องสร้างบัฟเฟอร์ สตรีง หรือบล็อก ซึ่งจะเปิดโอกาสให้ใช้ Use Case หลายรายการดังที่ระบุไว้ด้านล่าง
- เอฟเฟกต์วิดีโอ: การส่งผ่านสตรีมวิดีโอที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบที่ใช้เอฟเฟกต์แบบเรียลไทม์
- การบีบอัด (และเลิก) ข้อมูล: การส่งผ่านสตรีมไฟล์ผ่านสตรีมการเปลี่ยนรูปแบบที่ (และเลิก) บีบอัดข้อมูลโดยเลือก
- การถอดรหัสรูปภาพ: การส่งผ่านสตรีมการตอบกลับ HTTP ผ่านสตรีมการเปลี่ยนรูปแบบซึ่งถอดรหัสไบต์เป็นข้อมูลบิตแมป จากนั้นส่งผ่านสตรีมการเปลี่ยนรูปแบบอีกรายการหนึ่งซึ่งแปลบิตแมปเป็น PNG หากติดตั้งภายในตัวแฮนเดิล
fetch
ของ Service Worker จะช่วยให้คุณโพลีฟีลรูปแบบรูปภาพใหม่อย่าง AVIF ได้อย่างชัดเจน
การสนับสนุนเบราว์เซอร์
ReadableStream และ WritableStream
TransformStream
แนวคิดหลัก
ก่อนจะลงรายละเอียดเกี่ยวกับสตรีมประเภทต่างๆ เราขอแนะนำแนวคิดหลักๆ สักเล็กน้อย
ชิ้นส่วน
ข้อมูลเป็นข้อมูลชิ้นเดียวที่เขียนไปยังหรืออ่านจากสตรีม โดยอาจเป็นข้อมูลประเภทใดก็ได้ สตรีมอาจมีข้อมูลหลายประเภทรวมอยู่ด้วย ส่วนใหญ่แล้ว ข้อมูลส่วนนี้จะไม่ใช่หน่วยข้อมูลย่อยที่สุดสําหรับสตรีมหนึ่งๆ เช่น สตรีมไบต์อาจมีกลุ่มที่ประกอบด้วยหน่วย 16 กิไบต์ Uint8Array
แทนที่จะเป็นไบต์เดี่ยว
สตรีมที่อ่านได้
สตรีมที่อ่านได้แสดงแหล่งข้อมูลที่คุณอ่านได้ กล่าวคือ ข้อมูลมาจากสตรีมที่อ่านได้ กล่าวอย่างเป็นรูปธรรมคือ สตรีมที่อ่านได้คืออินสแตนซ์ของReadableStream
คลาส
สตรีมที่เขียนได้
สตรีมที่เขียนได้แสดงปลายทางสำหรับข้อมูลที่คุณสามารถเขียนได้ กล่าวคือ ข้อมูลจะเข้าไปในสตรีมที่เขียนได้ กล่าวอย่างเจาะจงคือ สตรีมแบบเขียนได้คืออินสแตนซ์ของคลาส WritableStream
เปลี่ยนรูปแบบสตรีม
สตรีมการเปลี่ยนรูปแบบประกอบด้วยสตรีม 2 รายการ ได้แก่ สตรีมที่เขียนได้ ซึ่งเรียกว่าฝั่งที่เขียนได้ และสตรีมที่อ่านได้ ซึ่งเรียกว่าฝั่งที่อ่านได้
อุปมาชีวิตจริงสำหรับฟีเจอร์นี้ก็คือล่ามแบบแปลพร้อมกันที่แปลจากภาษาหนึ่งเป็นภาษาอื่นขณะพูด
ในกรณีที่เฉพาะสตรีมการเปลี่ยนรูปแบบ การเขียนข้อมูลไปยังฝั่งที่เขียนได้จะทำให้ข้อมูลใหม่พร้อมใช้งานสำหรับการอ่านจากฝั่งที่อ่านได้ กล่าวโดยละเอียดคือ ออบเจ็กต์ใดก็ตามที่มีพร็อพเพอร์ตี้ writable
และพร็อพเพอร์ตี้ readable
สามารถใช้เป็นสตรีมการเปลี่ยนรูปแบบได้ อย่างไรก็ตาม คลาส TransformStream
มาตรฐานช่วยให้สร้างคู่ดังกล่าวที่พันกันได้อย่างถูกต้องได้ง่ายขึ้น
โซ่สำหรับท่อ
สตรีมส่วนใหญ่จะใช้เพื่อส่งผ่านสตรีมไปยังสตรีมอื่น สตรีมที่อ่านได้สามารถส่งผ่านไปยังสตรีมที่เขียนได้โดยตรงโดยใช้เมธอด pipeTo()
ของสตรีมที่อ่านได้ หรือจะส่งผ่านสตรีมการเปลี่ยนรูปแบบอย่างน้อย 1 รายการก่อนก็ได้โดยใช้เมธอด pipeThrough()
ของสตรีมที่อ่านได้ ชุดสตรีมที่มีการต่อท่อเข้าด้วยกันในลักษณะนี้เรียกว่าเชนไปป์
แรงดันย้อนกลับ
เมื่อสร้างเชนไปป์แล้ว เชนจะส่งสัญญาณเกี่ยวกับความเร็วที่ควรส่งผ่านข้อมูลผ่านเชน หากขั้นตอนใดในเชนยังไม่สามารถรับข้อมูลโค้ดได้ ระบบจะส่งสัญญาณย้อนกลับผ่านเชนไปเรื่อยๆ จนกว่าแหล่งที่มาเดิมจะได้รับแจ้งให้หยุดสร้างข้อมูลโค้ดอย่างรวดเร็ว กระบวนการปรับกระแสให้เป็นไปตามปกตินี้เรียกว่าแรงดันย้อนกลับ
การทำที
สตรีมที่อ่านได้สามารถแยกออกเป็น 2 ท่อ (ตั้งชื่อตามรูปร่างของ "T" ตัวพิมพ์ใหญ่) โดยใช้เมธอด tee()
ซึ่งจะล็อกสตรีม นั่นคือทำให้ใช้สตรีมโดยตรงไม่ได้อีกต่อไป แต่ระบบจะสร้างสตรีมใหม่ 2 รายการที่เรียกว่าสาขา ซึ่งจะใช้แยกกันได้
การเริ่มสตรีมก็สำคัญเช่นกันเนื่องจากสตรีมจะกรอกลับหรือเริ่มใหม่ไม่ได้ เราจะอธิบายเรื่องนี้เพิ่มเติมในภายหลัง
กลไกของสตรีมที่อ่านได้
สตรีมที่อ่านได้คือแหล่งข้อมูลที่แสดงใน JavaScript โดยออบเจ็กต์ ReadableStream
ที่มาจากแหล่งที่มาพื้นฐาน ตัวสร้างของ
ReadableStream()
จะสร้างและแสดงผลออบเจ็กต์สตรีมที่อ่านได้จากตัวแฮนเดิลที่ระบุ แหล่งที่มาพื้นฐานมี 2 ประเภท ได้แก่
- แหล่งข้อมูล Push จะส่งข้อมูลให้คุณอย่างต่อเนื่องเมื่อคุณเข้าถึงแหล่งข้อมูลดังกล่าว และคุณเป็นผู้เลือกว่าจะเริ่ม หยุดชั่วคราว หรือยกเลิกการเข้าถึงสตรีม เช่น สตรีมวิดีโอสด เหตุการณ์ที่เซิร์ฟเวอร์ส่ง หรือ WebSocket
- แหล่งที่มาแบบพุลกำหนดให้คุณต้องขอข้อมูลจากแหล่งที่มาอย่างชัดเจนเมื่อเชื่อมต่อแล้ว ตัวอย่าง ได้แก่ การดำเนินการ HTTP ผ่านการเรียก
fetch()
หรือXMLHttpRequest
ระบบจะอ่านข้อมูลสตรีมตามลําดับเป็นชิ้นเล็กๆ ที่เรียกว่าข้อมูลส่วน ข้อมูลโค้ดที่วางในสตรีมจะเรียกว่าจัดคิว ซึ่งหมายความว่าคำสั่งซื้อดังกล่าวกำลังรออยู่ในคิวพร้อมที่จะได้รับการอ่าน คิวภายในจะติดตามข้อมูลส่วนที่ยังไม่ได้อ่าน
กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กําหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบขนาดทั้งหมดของกลุ่มทั้งหมดในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าจำนวนสูงสุด
โปรแกรมอ่านจะอ่านข้อมูลในสตริง เครื่องอ่านนี้จะดึงข้อมูลทีละกลุ่ม ซึ่งช่วยให้คุณดำเนินการใดก็ได้กับข้อมูล เครื่องอ่านและรหัสการประมวลผลอื่นๆ ที่มาพร้อมกับเครื่องอ่านเรียกว่าผู้บริโภค
คอนสตรัคต์ถัดไปในบริบทนี้เรียกว่า คอนโทรลเลอร์ สตรีมที่อ่านได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงกัน ซึ่งช่วยให้คุณควบคุมสตรีมได้ดังชื่อที่ระบุ
มีเพียงผู้อ่านเพียงคนเดียวเท่านั้นที่อ่านสตรีมได้พร้อมกัน เมื่อสร้างผู้อ่านและเริ่มอ่านสตรีม (นั่นคือ กลายเป็นผู้อ่านที่ใช้งานอยู่) ระบบจะล็อกผู้อ่านดังกล่าวไว้กับสตรีมนั้น หากต้องการให้ผู้อ่านคนอื่นอ่านสตรีมต่อ โดยทั่วไปคุณต้องปล่อยผู้อ่านคนแรกก่อนดำเนินการอื่นๆ (แต่คุณส่งต่อสตรีมได้)
การสร้างสตรีมที่อ่านได้
คุณสร้างสตรีมที่อ่านได้โดยการเรียกคอนสตรัคเตอร์ของมัน
ReadableStream()
ตัวสร้างคอนสตรัคเตอร์มีพารามิเตอร์ underlyingSource
ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทํางานของอินสแตนซ์สตรีมที่สร้างขึ้น
underlyingSource
ซึ่งจะใช้วิธีการต่อไปนี้ที่นักพัฒนาแอปกำหนดไว้ (ไม่บังคับ)
start(controller)
: เรียกใช้ทันทีเมื่อมีการสร้างออบเจ็กต์ วิธีการนี้สามารถเข้าถึงแหล่งที่มาของสตรีมและทำสิ่งอื่นๆ ที่จำเป็นในการตั้งค่าฟังก์ชันการทำงานของสตรีม หากต้องดำเนินการนี้แบบไม่พร้อมกัน เมธอดจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือไม่สำเร็จ พารามิเตอร์controller
ที่ส่งไปยังเมธอดนี้คือ aReadableStreamDefaultController
pull(controller)
: ใช้เพื่อควบคุมสตรีมเมื่อมีการดึงข้อมูลกลุ่มเพิ่มเติม ระบบจะเรียกใช้ซ้ำๆ ตราบใดที่คิวภายในของกลุ่มของข้อมูลสตรีมยังไม่เต็ม จนกว่าคิวจะถึงจุดสูงสุด หากผลลัพธ์ของการเรียกpull()
เป็นสัญญา ระบบจะไม่เรียกpull()
อีกจนกว่าสัญญาดังกล่าวจะสำเร็จ หาก Promise ปฏิเสธ สตรีมจะแสดงข้อผิดพลาดcancel(reason)
: เรียกใช้เมื่อผู้บริโภคสตรีมยกเลิกสตรีม
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
รองรับวิธีการต่อไปนี้
ReadableStreamDefaultController.close()
ปิดสตรีมที่เกี่ยวข้องReadableStreamDefaultController.enqueue()
จัดคิวข้อมูลส่วนหนึ่งที่ระบุในสตรีมที่เกี่ยวข้องReadableStreamDefaultController.error()
ทําให้การทำงานกับสตรีมที่เกี่ยวข้องในอนาคตเกิดข้อผิดพลาด
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ ReadableStream()
ซึ่งไม่บังคับเช่นกันคือ queuingStrategy
ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์ 2 รายการดังนี้
highWaterMark
: ตัวเลขที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้size(chunk)
: ฟังก์ชันที่คำนวณและแสดงผลขนาดที่ไม่เป็นลบแบบจำกัดของค่าข้อมูลโค้ดที่ระบุ ระบบจะใช้ผลลัพธ์นี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ReadableStreamDefaultController.desiredSize
ที่เหมาะสม และยังควบคุมเมื่อมีการคําเรียกเมธอดpull()
ของแหล่งที่มาพื้นฐานด้วย
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
และread()
หากต้องการอ่านจากสตรีมที่อ่านได้ คุณต้องมีโปรแกรมอ่าน ซึ่งจะเป็น ReadableStreamDefaultReader
เมธอด getReader()
ของอินเทอร์เฟซ ReadableStream
จะสร้างโปรแกรมอ่านและล็อกสตรีมไว้กับโปรแกรมอ่านนั้น ขณะที่สตรีมล็อกอยู่ คุณจะไม่สามารถรับสิทธิ์อ่านจากเครื่องอ่านเครื่องอื่นได้จนกว่าจะปล่อยเครื่องอ่านนี้
เมธอด read()
ของอินเทอร์เฟซ ReadableStreamDefaultReader
จะแสดงผลพรอมต์ที่ให้สิทธิ์เข้าถึงข้อมูลส่วนถัดไปในคิวภายในของสตรีม โดยจะยอมรับหรือปฏิเสธโดยขึ้นอยู่กับสถานะของสตรีม โอกาสต่างๆ มีดังนี้
- หากมีข้อมูลโค้ดพร้อมใช้งาน ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ของรูปแบบ
{ value: chunk, done: false }
- หากสตรีมปิดอยู่ ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ของรูปแบบ
{ value: undefined, done: true }
- หากสตรีมเกิดข้อผิดพลาด ระบบจะปฏิเสธการสัญญาพร้อมข้อผิดพลาดที่เกี่ยวข้อง
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
พร็อพเพอร์ตี้ locked
คุณสามารถตรวจสอบว่าสตรีมที่อ่านได้ถูกล็อกหรือไม่โดยไปที่พร็อพเพอร์ตี้ของ
ReadableStream.locked
สตรีมนั้น
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
ตัวอย่างโค้ดสตรีมที่อ่านได้
โค้ดตัวอย่างด้านล่างแสดงขั้นตอนทั้งหมดที่ใช้งาน ก่อนอื่นให้สร้าง ReadableStream
ที่กําหนดวิธีการ start()
ในอาร์กิวเมนต์ underlyingSource
(นั่นคือคลาส TimestampSource
)
วิธีนี้จะบอก controller
ของสตรีมให้ประทับเวลาทุกวินาทีในช่วง 10 วินาทีenqueue()
สุดท้าย อุปกรณ์จะบอกให้ตัวควบคุมclose()
สตรีม คุณใช้สตรีมนี้ได้โดยสร้างโปรแกรมอ่านผ่านเมธอด getReader()
และเรียกใช้ read()
จนกว่าสตรีมจะdone
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
การทำซ้ำแบบอะซิงโครนัส
การตรวจสอบว่าสตรีมเป็น done
หรือไม่ในแต่ละรอบของ read()
อาจไม่ใช่ API ที่สะดวกที่สุด
แต่โชคดีที่จะมีวิธีที่ดีกว่านี้ในเร็วๆ นี้ ซึ่งก็คือการทำซ้ำแบบไม่พร้อมกัน
for await (const chunk of stream) {
console.log(chunk);
}
การวนซ้ำแบบไม่พร้อมกัน
วิธีแก้ปัญหาชั่วคราวในการใช้การทำซ้ำแบบอะซิงโครนัสในปัจจุบันคือการใช้ลักษณะการทำงานด้วย polyfill
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
การสร้างสตรีมที่อ่านได้
เมธอด tee()
ของอินเทอร์เฟซ ReadableStream
จะแยกสตรีมที่อ่านได้ปัจจุบันออก โดยจะแสดงผลอาร์เรย์ 2 องค์ประกอบที่มีสาขาที่เป็นผลลัพธ์ 2 รายการเป็นอินสแตนซ์ ReadableStream
ใหม่ ซึ่งจะช่วยให้ผู้อ่าน 2 คนอ่านสตรีมพร้อมกันได้ คุณอาจทำเช่นนี้ใน Service Worker ในกรณีที่ต้องการดึงข้อมูลการตอบกลับจากเซิร์ฟเวอร์และสตรีมไปยังเบราว์เซอร์ รวมถึงสตรีมไปยังแคช Service Worker ด้วย เนื่องจากใช้เนื้อหาการตอบกลับได้เพียงครั้งเดียว คุณจึงต้องใช้สำเนา 2 รายการเพื่อทำเช่นนี้ หากต้องการยกเลิกสตรีม คุณต้องยกเลิกทั้ง 2 สาขาที่เกิดขึ้น โดยทั่วไปแล้ว การเปิดใช้สตรีมจะล็อกสตรีมนั้นไว้ตลอดระยะเวลาที่เปิดใช้ ซึ่งจะป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีมได้
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
สตรีมไบต์ที่อ่านได้
สําหรับสตรีมที่แสดงไบต์ ระบบจะจัดเตรียมสตรีมที่อ่านได้เวอร์ชันขยายเพื่อจัดการไบต์อย่างมีประสิทธิภาพ โดยเฉพาะอย่างยิ่งด้วยการลดการคัดลอก สตรีมไบต์ช่วยให้สามารถรับผู้อ่านแบบนำบัฟเฟอร์ของคุณเองมา (BYOB) การใช้งานเริ่มต้นจะให้เอาต์พุตที่หลากหลาย เช่น สตริงหรือบัฟเฟอร์อาร์เรย์ในกรณีของ WebSockets ส่วนสตรีมไบต์จะรับประกันเอาต์พุตไบต์ นอกจากนี้ ผู้อ่าน BYOB ยังได้รับสิทธิประโยชน์ด้านความเสถียรด้วย เนื่องจากหากบัฟเฟอร์แยกออก การดำเนินการนี้จะรับประกันว่าไม่มีการเขียนลงในบัฟเฟอร์เดียวกัน 2 ครั้ง จึงหลีกเลี่ยงเงื่อนไขการแข่งขันได้ เครื่องอ่าน BYOB สามารถลดจำนวนครั้งที่เบราว์เซอร์ต้องเรียกใช้การเก็บขยะได้ เนื่องจากสามารถนําบัฟเฟอร์มาใช้ซ้ำได้
การสร้างสตรีมไบต์ที่อ่านได้
คุณสามารถสร้างสตรีมไบต์ที่อ่านได้โดยการส่งพารามิเตอร์ type
เพิ่มเติมไปยังคอนสตรคเตอร์ ReadableStream()
new ReadableStream({ type: 'bytes' });
underlyingSource
แหล่งที่มาของไบต์สตรีมที่อ่านได้จะได้รับ ReadableByteStreamController
เพื่อใช้ดำเนินการ เมธอด ReadableByteStreamController.enqueue()
ของคลาสนี้ใช้อาร์กิวเมนต์ chunk
ที่มีค่าเป็น ArrayBufferView
พร็อพเพอร์ตี้ ReadableByteStreamController.byobRequest
จะแสดงคำขอดึง BYOB ในปัจจุบัน หรือแสดงค่าว่างหากไม่มี สุดท้าย พร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize
จะแสดงผลขนาดที่ต้องการเพื่อเติมคิวภายในของสตรีมที่ควบคุม
queuingStrategy
อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ ReadableStream()
ซึ่งไม่บังคับเช่นกันคือ queuingStrategy
ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์เดียว ดังนี้
highWaterMark
: จำนวนไบต์ที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้ ข้อมูลนี้ใช้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ReadableByteStreamController.desiredSize
ที่เหมาะสม และยังควบคุมเมื่อมีการคําเรียกเมธอดpull()
ของแหล่งที่มาพื้นฐานด้วย
getReader()
และread()
จากนั้นคุณจะได้รับสิทธิ์เข้าถึง ReadableStreamBYOBReader
โดยการตั้งค่าพารามิเตอร์ mode
ดังนี้
ReadableStream.getReader({ mode: "byob" })
วิธีนี้ช่วยให้ควบคุมการจัดสรรบัฟเฟอร์ได้แม่นยำยิ่งขึ้นเพื่อหลีกเลี่ยงการคัดลอก หากต้องการอ่านจากสตรีมไบต์ คุณต้องเรียกใช้ ReadableStreamBYOBReader.read(view)
โดยที่ view
คือ ArrayBufferView
ตัวอย่างโค้ดสตรีมไบต์ที่อ่านได้
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
ฟังก์ชันต่อไปนี้จะแสดงผลสตรีมไบต์ที่อ่านได้ ซึ่งช่วยให้อ่านอาร์เรย์ที่สร้างขึ้นแบบสุ่มแบบไม่คัดลอกได้อย่างมีประสิทธิภาพ แทนที่จะใช้ขนาดข้อมูลขนาด 1,024 ที่กําหนดไว้ล่วงหน้า ระบบจะพยายามกรอกข้อมูลในบัฟเฟอร์ที่นักพัฒนาแอประบุ ซึ่งช่วยให้ควบคุมได้อย่างเต็มที่
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
กลไกของสตรีมที่เขียนได้
สตรีมที่เขียนได้คือปลายทางที่คุณเขียนข้อมูลได้ ซึ่งแสดงใน JavaScript โดยออบเจ็กต์ WritableStream
ซึ่งทำหน้าที่เป็นแอบสแตรกชันเหนือซิงค์ที่อยู่เบื้องหลัง ซึ่งเป็นซิงค์ I/O ระดับล่างที่เขียนข้อมูลดิบ
ระบบจะเขียนข้อมูลลงในสตรีมผ่านโปรแกรมเขียนทีละกลุ่ม ข้อมูลโค้ดมีได้หลายรูปแบบ เช่นเดียวกับข้อมูลโค้ดในโปรแกรมอ่าน คุณใช้โค้ดใดก็ได้เพื่อสร้างข้อมูลที่จะเขียนได้ โดยโปรแกรมเขียนโค้ดและโค้ดที่เกี่ยวข้องเรียกว่าโปรแกรมสร้าง
เมื่อสร้างผู้เขียนและเริ่มเขียนลงในสตรีม (ผู้เขียนที่ทำงานอยู่) ระบบจะถือว่าผู้เขียนล็อกอยู่กับสตรีมนั้น มีผู้เขียนได้เพียงคนเดียวเท่านั้นที่เขียนลงในสตรีมแบบเขียนได้พร้อมกัน หากต้องการให้ผู้เขียนคนอื่นๆ เริ่มเขียนในสตรีมของคุณ โดยทั่วไปคุณจะต้องเผยแพร่สตรีมก่อน จากนั้นจึงจะแนบผู้เขียนคนอื่นๆ ในสตรีมได้
คิวภายในจะติดตามข้อมูลส่วนที่เขียนลงในสตรีมแล้วแต่ยังไม่ได้ประมวลผลโดยซิงค์พื้นฐาน
กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กําหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบขนาดทั้งหมดของกลุ่มทั้งหมดในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าจำนวนสูงสุด
โครงสร้างสุดท้ายเรียกว่าตัวควบคุม สตรีมแบบเขียนได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงซึ่งช่วยให้คุณควบคุมสตรีมได้ (เช่น เพื่อหยุดกลางคัน)
การสร้างสตรีมที่เขียนได้
อินเทอร์เฟซ WritableStream
ของ Streams API ให้การแยกแยะมาตรฐานสำหรับการเขียนข้อมูลสตรีมไปยังปลายทาง ซึ่งเรียกว่าซิงค์ ออบเจ็กต์นี้มีแรงดันย้อนกลับและการจัดคิวในตัว คุณสร้างสตรีมแบบเขียนได้โดยเรียกคอนสตรัคเตอร์ของ WritableStream()
โดยมีพารามิเตอร์ underlyingSink
ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทํางานของอินสแตนซ์สตรีมที่สร้างขึ้น
underlyingSink
underlyingSink
สามารถรวมวิธีการที่ไม่บังคับซึ่งนักพัฒนาแอปกำหนดไว้ดังต่อไปนี้ พารามิเตอร์ controller
ที่ส่งไปยังเมธอดบางรายการคือ WritableStreamDefaultController
start(controller)
: ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ เนื้อหาของเมธอดนี้ควรมีจุดมุ่งหมายเพื่อเข้าถึงซิงค์ที่อยู่เบื้องหลัง หากต้องการดำเนินการนี้แบบไม่พร้อมกัน การดำเนินการจะแสดงผลลัพธ์สำเร็จหรือล้มเหลวwrite(chunk, controller)
: ระบบจะเรียกใช้เมธอดนี้เมื่อข้อมูลกลุ่มใหม่ (ที่ระบุไว้ในพารามิเตอร์chunk
) พร้อมที่จะเขียนลงในซิงค์ที่อยู่เบื้องหลัง การดำเนินการนี้จะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนก่อนหน้านี้สำเร็จเท่านั้น และจะไม่เรียกใช้หลังจากที่สตรีมปิดหรือยกเลิกclose(controller)
: ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าเขียนข้อมูลไปยังสตรีมเสร็จแล้ว เนื้อหาควรทําทุกอย่างที่จําเป็นเพื่อเขียนข้อมูลไปยังที่เก็บข้อมูลที่อยู่เบื้องหลังให้เสร็จสมบูรณ์ และยกเลิกสิทธิ์เข้าถึง หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนทั้งหมดที่อยู่ในคิวดำเนินการเสร็จสมบูรณ์แล้วเท่านั้นabort(reason)
: ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าต้องการปิดสตรีมอย่างกะทันหันและทำให้สตรีมอยู่ในสถานะที่มีข้อผิดพลาด การดำเนินการนี้จะล้างทรัพยากรที่ถืออยู่ได้เช่นเดียวกับclose()
แต่ระบบจะเรียกabort()
แม้จะจัดคิวการเขียนไว้แล้วก็ตาม ระบบจะทิ้งข้อมูลเหล่านั้น หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว พารามิเตอร์reason
มีDOMString
ที่อธิบายสาเหตุที่หยุดสตรีม
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
อินเทอร์เฟซของ Streams API WritableStreamDefaultController
แสดงตัวควบคุมที่อนุญาตให้ควบคุมสถานะของ WritableStream
ในระหว่างการตั้งค่า เมื่อส่งข้อมูลจำนวนมากขึ้นสำหรับการเขียน หรือเมื่อสิ้นสุดการเขียน เมื่อสร้าง WritableStream
ไปป์ไลน์ที่ฝังอยู่จะได้รับอินสแตนซ์ WritableStreamDefaultController
ที่เกี่ยวข้องเพื่อดำเนินการ WritableStreamDefaultController
มีเพียงเมธอดเดียวเท่านั้น ซึ่งก็คือ WritableStreamDefaultController.error()
ซึ่งทําให้การทำงานกับสตรีมที่เกี่ยวข้องในอนาคตเกิดข้อผิดพลาด
WritableStreamDefaultController
ยังรองรับพร็อพเพอร์ตี้ signal
ซึ่งจะแสดงผลอินสแตนซ์ของ AbortSignal
ซึ่งช่วยให้หยุดการดำเนินการ WritableStream
ได้หากจำเป็น
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ WritableStream()
ซึ่งไม่บังคับเช่นเดียวกันคือ queuingStrategy
ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์ 2 รายการดังนี้
highWaterMark
: ตัวเลขที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้size(chunk)
: ฟังก์ชันที่คำนวณและแสดงผลขนาดที่ไม่เป็นลบแบบจำกัดของค่าข้อมูลโค้ดที่ระบุ ระบบจะใช้ผลลัพธ์นี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้WritableStreamDefaultWriter.desiredSize
ที่เหมาะสม
getWriter()
และwrite()
หากต้องการเขียนลงในสตรีมที่เขียนได้ คุณต้องมีโปรแกรมเขียน ซึ่งจะเป็น WritableStreamDefaultWriter
เมธอด getWriter()
ของอินเทอร์เฟซ WritableStream
จะแสดงผลอินสแตนซ์ใหม่ของ WritableStreamDefaultWriter
และล็อกสตรีมไปยังอินสแตนซ์นั้น ขณะที่สตรีมถูกล็อก คุณจะไม่สามารถรับผู้เขียนคนใหม่ได้จนกว่าผู้เขียนคนปัจจุบันจะได้รับการปล่อยตัว
เมธอด write()
ของอินเตอร์เฟซ WritableStreamDefaultWriter
จะเขียนข้อมูลกลุ่มที่ส่งไปยัง WritableStream
และซิงค์ที่อยู่เบื้องหลัง จากนั้นจะแสดงผลลัพธ์เป็นสัญญาว่าจะดำเนินการเพื่อระบุความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน โปรดทราบว่าความหมายของ "สำเร็จ" ขึ้นอยู่กับซิงค์ที่อยู่เบื้องหลัง ซึ่งอาจบ่งบอกว่าระบบยอมรับข้อมูลดังกล่าวแล้ว แต่ไม่จําเป็นว่าข้อมูลดังกล่าวจะได้รับการบันทึกอย่างปลอดภัยไปยังปลายทาง
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
พร็อพเพอร์ตี้ locked
คุณสามารถตรวจสอบว่าสตรีมที่ใช้เขียนได้ถูกล็อกหรือไม่โดยไปที่พร็อพเพอร์ตี้ WritableStream.locked
ของสตรีมนั้น
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
ตัวอย่างโค้ดสตรีมแบบเขียนได้
โค้ดตัวอย่างด้านล่างแสดงขั้นตอนทั้งหมดที่ใช้งาน
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
การส่งผ่านสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้
สตรีมที่อ่านได้สามารถส่งผ่านไปยังสตรีมที่เขียนได้ผ่านเมธอด pipeTo()
ของสตรีมที่อ่านได้
ReadableStream.pipeTo()
จะส่งผ่าน ReadableStream
ในปัจจุบันไปยัง WritableStream
ที่ระบุและแสดงผลลัพธ์เป็น Promise ที่ดำเนินการเมื่อกระบวนการส่งผ่านเสร็จสมบูรณ์ หรือปฏิเสธหากพบข้อผิดพลาด
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
การสร้างสตรีมการเปลี่ยนรูปแบบ
อินเทอร์เฟซ TransformStream
ของ Streams API แสดงชุดข้อมูลที่เปลี่ยนรูปแบบได้ คุณสร้างสตรีมการเปลี่ยนรูปแบบได้โดยเรียกคอนสตรัคเตอร์ TransformStream()
ซึ่งจะสร้างและแสดงผลออบเจ็กต์สตรีมการเปลี่ยนรูปแบบจากตัวแฮนเดิลที่ระบุ ตัวสร้าง TransformStream()
จะยอมรับออบเจ็กต์ JavaScript ที่ไม่บังคับซึ่งแสดงถึง transformer
เป็นอาร์กิวเมนต์แรก ออบเจ็กต์ดังกล่าวอาจมีเมธอดใดก็ได้ต่อไปนี้
transformer
start(controller)
: ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มคำนำหน้าโดยใช้controller.enqueue()
ระบบจะอ่านข้อมูลดังกล่าวจากฝั่งที่อ่านได้ แต่ไม่ขึ้นอยู่กับการเขียนไปยังฝั่งที่เขียนได้ หากกระบวนการเริ่มต้นนี้เป็นแบบไม่พร้อมกัน เช่น เนื่องจากต้องใช้เวลาในการรับข้อมูลโค้ดนำหน้า ฟังก์ชันจะแสดงผลพรอมต์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวได้ โดยพรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดในสตรีม ตัวสร้างTransformStream()
จะโยนข้อยกเว้นที่โยนtransform(chunk, controller)
: ระบบจะเรียกใช้เมธอดนี้เมื่อข้อมูลใหม่ซึ่งเขียนไปยังฝั่งที่เขียนได้พร้อมที่จะเปลี่ยนรูปแบบแล้ว การใช้งานสตรีมจะรับประกันว่าฟังก์ชันนี้จะเรียกใช้หลังจากที่การแปลงก่อนหน้าสําเร็จเท่านั้น และจะไม่เรียกใช้ก่อนstart()
เสร็จสมบูรณ์หรือหลังจากเรียกใช้flush()
ฟังก์ชันนี้จะทํางานเปลี่ยนรูปแบบจริงของสตรีมการเปลี่ยนรูปแบบ โดยสามารถจัดคิวผลลัพธ์ได้โดยใช้controller.enqueue()
ซึ่งอนุญาตให้เขียนข้อมูลไปยังฝั่งที่เขียนได้เพียงครั้งเดียว แต่ฝั่งที่อ่านได้อาจมีข้อมูลหลายกลุ่มหรือไม่มีเลย ทั้งนี้ขึ้นอยู่กับจำนวนครั้งที่เรียกcontroller.enqueue()
หากกระบวนการเปลี่ยนรูปแบบเป็นแบบไม่พร้อมกัน ฟังก์ชันนี้จะแสดงผลพรอมต์เพื่อบ่งบอกความสําเร็จหรือความล้มเหลวของการเปลี่ยนรูปแบบ พรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งฝั่งที่อ่านได้และเขียนได้ของสตรีมการเปลี่ยนรูปแบบ หากไม่ได้ระบุเมธอดtransform()
ระบบจะใช้การเปลี่ยนรูปแบบข้อมูลประจำตัว ซึ่งจะจัดคิวข้อมูลส่วนที่ไม่มีการแก้ไขจากฝั่งที่เขียนได้ไปยังฝั่งที่อ่านได้flush(controller)
: ระบบจะเรียกใช้เมธอดนี้หลังจากที่เปลี่ยนรูปแบบข้อมูลทั้งหมดที่เขียนลงในฝั่งที่เขียนได้โดยการผ่านtransform()
เรียบร้อยแล้ว และฝั่งที่เขียนได้กำลังจะปิด โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มส่วนต่อท้ายไปยังฝั่งที่อ่านได้ ก่อนที่ฝั่งนั้นจะปิดลงด้วย หากการล้างข้อมูลเป็นแบบไม่เป็นเชิงเวลา ฟังก์ชันจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว และระบบจะสื่อสารผลลัพธ์ไปยังผู้เรียกใช้stream.writable.write()
นอกจากนี้ พรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งฝั่งที่อ่านได้และเขียนได้ของสตรีม การยกเว้นข้อยกเว้นจะถือว่าเหมือนกับการคืนค่า Promise ที่ปฏิเสธ
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
กลยุทธ์การจัดคิว writableStrategy
และ readableStrategy
พารามิเตอร์ที่ไม่บังคับลำดับที่ 2 และ 3 ของคอนสตรคเตอร์ TransformStream()
จะใช้หรือไม่ก็ได้
writableStrategy
และกลยุทธ์การจัดคิว readableStrategy
โดยมีการกําหนดไว้ดังที่ระบุไว้ในส่วนสตรีมที่อ่านได้และที่เขียนได้ตามลําดับ
ตัวอย่างโค้ดสตรีมการเปลี่ยนรูปแบบ
ตัวอย่างโค้ดต่อไปนี้แสดงสตรีมการเปลี่ยนรูปแบบแบบง่ายที่ทำงานอยู่
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
การส่งผ่านสตรีมที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบ
เมธอด pipeThrough()
ของอินเทอร์เฟซ ReadableStream
มีวิธีต่อเชื่อมสตรีมปัจจุบันผ่านสตรีมการเปลี่ยนรูปแบบหรือคู่อื่นๆ ที่เขียนได้/อ่านได้ โดยทั่วไปแล้ว การส่งผ่านสตรีมจะล็อกสตรีมนั้นไว้ตลอดระยะเวลาการส่งผ่าน ซึ่งจะป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีม
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
ตัวอย่างโค้ดถัดไป (ค่อนข้างซับซ้อน) แสดงวิธีใช้ fetch()
เวอร์ชัน "ตะโกน" ซึ่งเปลี่ยนข้อความทั้งหมดเป็นตัวพิมพ์ใหญ่โดยใช้ Promise การตอบกลับที่แสดงผลเป็นสตรีม และเปลี่ยนเป็นตัวพิมพ์ใหญ่ทีละกลุ่ม ข้อดีของวิธีการนี้คือคุณไม่จําเป็นต้องรอให้ดาวน์โหลดเอกสารทั้งฉบับ ซึ่งจะทําให้เกิดความแตกต่างอย่างมากเมื่อจัดการกับไฟล์ขนาดใหญ่
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
สาธิต
ตัวอย่างด้านล่างแสดงสตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบ รวมถึงมีตัวอย่างโซ่ท่อ pipeThrough()
และ pipeTo()
รวมถึงสาธิต tee()
ด้วย คุณสามารถเลือกที่จะเรียกใช้เดโมในหน้าต่างของตัวเองหรือดูซอร์สโค้ดก็ได้
สตรีมมีประโยชน์ที่พร้อมใช้งานในเบราว์เซอร์
เบราว์เซอร์มีสตรีมที่มีประโยชน์หลายรายการที่ฝังไว้ คุณสามารถสร้าง
ReadableStream
จาก Blob ได้อย่างง่ายดาย วิธีการ stream() ของอินเทอร์เฟซ Blob
จะแสดงผล ReadableStream
ซึ่งจะแสดงข้อมูลที่อยู่ใน Blob เมื่ออ่าน นอกจากนี้ โปรดทราบว่าออบเจ็กต์ File
เป็น Blob
ประเภทหนึ่งๆ และสามารถใช้ในบริบทใดก็ได้ที่ Blob ใช้ได้
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
ตัวแปรสตรีมมิงของ TextDecoder.decode()
และ TextEncoder.encode()
จะเรียกว่า TextDecoderStream
และ TextEncoderStream
ตามลำดับ
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
การบีบอัดหรือคลายการบีบอัดไฟล์นั้นทำได้ง่ายโดยใช้สตรีมการเปลี่ยนรูปแบบ CompressionStream
และ DecompressionStream
ตามลำดับ ตัวอย่างโค้ดด้านล่างแสดงวิธีดาวน์โหลดข้อกำหนดของ Streams, บีบอัด (gzip) ไฟล์ในเบราว์เซอร์โดยตรง และเขียนไฟล์ที่บีบอัดไปยังดิสก์โดยตรง
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
ของ File System Access API และfetch()
สตรีมคำขอเวอร์ชันทดลองเป็นตัวอย่างของสตรีมที่เขียนได้ในโลกแห่งความเป็นจริง
Serial API ใช้สตรีมที่อ่านและเขียนได้เป็นอย่างมาก
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
สุดท้าย WebSocketStream
API จะผสานรวมสตรีมกับ WebSocket API
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
แหล่งข้อมูลที่มีประโยชน์
- ข้อกำหนดเฉพาะของสตรีม
- เดโมประกอบ
- Streams polyfill
- 2016 - ปีแห่งสตรีมเว็บ
- ตัวดำเนินการและตัวสร้างแบบไม่พร้อมกัน
- โปรแกรมแสดงภาพสตรีม
ขอขอบคุณ
บทความนี้ได้รับการตรวจสอบโดย Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley และ Adam Rice บล็อกโพสต์ของ Jake Archibald ช่วยฉันได้มากในการทําความเข้าใจสตรีม ตัวอย่างโค้ดบางส่วนได้รับแรงบันดาลใจจากการสํารวจของผู้ใช้ GitHub @bellbind และบางส่วนของข้อความเขียนขึ้นโดยอิงตามเอกสารประกอบบนเว็บ MDN สําหรับสตรีม ผู้เขียนมาตรฐานสตรีมทํางานได้อย่างยอดเยี่ยมในการเขียนข้อกําหนดนี้ รูปภาพหลักโดย Ryan Lara จาก Unsplash