สตรีม—คู่มือฉบับสมบูรณ์

ดูวิธีใช้สตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบด้วย Streams API

Streams API ช่วยให้คุณเข้าถึงสตรีมข้อมูลที่รับผ่านเครือข่ายหรือสร้างขึ้นด้วยวิธีใดก็ได้ในเครื่องแบบเป็นโปรแกรม และประมวลผลด้วย JavaScript สตรีมมิงเกี่ยวข้องกับการแบ่งทรัพยากรที่คุณต้องการรับ ส่ง หรือเปลี่ยนรูปแบบเป็นกลุ่มเล็กๆ แล้วประมวลผลกลุ่มเหล่านี้ทีละส่วน แม้ว่าการสตรีมเป็นสิ่งที่เบราว์เซอร์ทำอยู่แล้วเมื่อได้รับชิ้นงาน เช่น HTML หรือวิดีโอเพื่อแสดงในหน้าเว็บ แต่ JavaScript ไม่เคยมีความสามารถนี้มาก่อนจนกระทั่งมีการเปิดตัว fetch ที่มีสตรีมในปี 2015

ก่อนหน้านี้ หากต้องการประมวลผลทรัพยากรบางประเภท (ไม่ว่าจะเป็นวิดีโอหรือไฟล์ข้อความ ฯลฯ) คุณจะต้องดาวน์โหลดไฟล์ทั้งไฟล์ รอให้ไฟล์ได้รับการแปลงเป็นรูปแบบที่เหมาะสม แล้วจึงประมวลผล เมื่อสตรีมพร้อมใช้งานสำหรับ JavaScript ทุกอย่างก็เปลี่ยนไป ตอนนี้คุณสามารถประมวลผลข้อมูลดิบด้วย JavaScript แบบเป็นขั้นเป็นตอนได้ทันทีที่ข้อมูลพร้อมใช้งานบนไคลเอ็นต์ โดยไม่ต้องสร้างบัฟเฟอร์ สตรีง หรือบล็อก ซึ่งจะเปิดโอกาสให้ใช้ Use Case หลายรายการดังที่ระบุไว้ด้านล่าง

  • เอฟเฟกต์วิดีโอ: การส่งผ่านสตรีมวิดีโอที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบที่ใช้เอฟเฟกต์แบบเรียลไทม์
  • การบีบอัด (และเลิก) ข้อมูล: การส่งผ่านสตรีมไฟล์ผ่านสตรีมการเปลี่ยนรูปแบบที่ (และเลิก) บีบอัดข้อมูลโดยเลือก
  • การถอดรหัสรูปภาพ: การส่งผ่านสตรีมการตอบกลับ HTTP ผ่านสตรีมการเปลี่ยนรูปแบบซึ่งถอดรหัสไบต์เป็นข้อมูลบิตแมป จากนั้นส่งผ่านสตรีมการเปลี่ยนรูปแบบอีกรายการหนึ่งซึ่งแปลบิตแมปเป็น PNG หากติดตั้งภายในตัวแฮนเดิล fetch ของ Service Worker จะช่วยให้คุณโพลีฟีลรูปแบบรูปภาพใหม่อย่าง AVIF ได้อย่างชัดเจน

การสนับสนุนเบราว์เซอร์

ReadableStream และ WritableStream

การรองรับเบราว์เซอร์

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65
  • Safari: 10.1

แหล่งที่มา

TransformStream

การรองรับเบราว์เซอร์

  • Chrome: 67
  • Edge: 79
  • Firefox: 102
  • Safari: 14.1

แหล่งที่มา

แนวคิดหลัก

ก่อนจะลงรายละเอียดเกี่ยวกับสตรีมประเภทต่างๆ เราขอแนะนำแนวคิดหลักๆ สักเล็กน้อย

ชิ้นส่วน

ข้อมูลเป็นข้อมูลชิ้นเดียวที่เขียนไปยังหรืออ่านจากสตรีม โดยอาจเป็นข้อมูลประเภทใดก็ได้ สตรีมอาจมีข้อมูลหลายประเภทรวมอยู่ด้วย ส่วนใหญ่แล้ว ข้อมูลส่วนนี้จะไม่ใช่หน่วยข้อมูลย่อยที่สุดสําหรับสตรีมหนึ่งๆ เช่น สตรีมไบต์อาจมีกลุ่มที่ประกอบด้วยหน่วย 16 กิไบต์ Uint8Array แทนที่จะเป็นไบต์เดี่ยว

สตรีมที่อ่านได้

สตรีมที่อ่านได้แสดงแหล่งข้อมูลที่คุณอ่านได้ กล่าวคือ ข้อมูลมาจากสตรีมที่อ่านได้ กล่าวอย่างเป็นรูปธรรมคือ สตรีมที่อ่านได้คืออินสแตนซ์ของReadableStream คลาส

สตรีมที่เขียนได้

สตรีมที่เขียนได้แสดงปลายทางสำหรับข้อมูลที่คุณสามารถเขียนได้ กล่าวคือ ข้อมูลจะเข้าไปในสตรีมที่เขียนได้ กล่าวอย่างเจาะจงคือ สตรีมแบบเขียนได้คืออินสแตนซ์ของคลาส WritableStream

เปลี่ยนรูปแบบสตรีม

สตรีมการเปลี่ยนรูปแบบประกอบด้วยสตรีม 2 รายการ ได้แก่ สตรีมที่เขียนได้ ซึ่งเรียกว่าฝั่งที่เขียนได้ และสตรีมที่อ่านได้ ซึ่งเรียกว่าฝั่งที่อ่านได้ อุปมาชีวิตจริงสำหรับฟีเจอร์นี้ก็คือล่ามแบบแปลพร้อมกันที่แปลจากภาษาหนึ่งเป็นภาษาอื่นขณะพูด ในกรณีที่เฉพาะสตรีมการเปลี่ยนรูปแบบ การเขียนข้อมูลไปยังฝั่งที่เขียนได้จะทำให้ข้อมูลใหม่พร้อมใช้งานสำหรับการอ่านจากฝั่งที่อ่านได้ กล่าวโดยละเอียดคือ ออบเจ็กต์ใดก็ตามที่มีพร็อพเพอร์ตี้ writable และพร็อพเพอร์ตี้ readable สามารถใช้เป็นสตรีมการเปลี่ยนรูปแบบได้ อย่างไรก็ตาม คลาส TransformStream มาตรฐานช่วยให้สร้างคู่ดังกล่าวที่พันกันได้อย่างถูกต้องได้ง่ายขึ้น

โซ่สำหรับท่อ

สตรีมส่วนใหญ่จะใช้เพื่อส่งผ่านสตรีมไปยังสตรีมอื่น สตรีมที่อ่านได้สามารถส่งผ่านไปยังสตรีมที่เขียนได้โดยตรงโดยใช้เมธอด pipeTo() ของสตรีมที่อ่านได้ หรือจะส่งผ่านสตรีมการเปลี่ยนรูปแบบอย่างน้อย 1 รายการก่อนก็ได้โดยใช้เมธอด pipeThrough() ของสตรีมที่อ่านได้ ชุดสตรีมที่มีการต่อท่อเข้าด้วยกันในลักษณะนี้เรียกว่าเชนไปป์

แรงดันย้อนกลับ

เมื่อสร้างเชนไปป์แล้ว เชนจะส่งสัญญาณเกี่ยวกับความเร็วที่ควรส่งผ่านข้อมูลผ่านเชน หากขั้นตอนใดในเชนยังไม่สามารถรับข้อมูลโค้ดได้ ระบบจะส่งสัญญาณย้อนกลับผ่านเชนไปเรื่อยๆ จนกว่าแหล่งที่มาเดิมจะได้รับแจ้งให้หยุดสร้างข้อมูลโค้ดอย่างรวดเร็ว กระบวนการปรับกระแสให้เป็นไปตามปกตินี้เรียกว่าแรงดันย้อนกลับ

การทำที

สตรีมที่อ่านได้สามารถแยกออกเป็น 2 ท่อ (ตั้งชื่อตามรูปร่างของ "T" ตัวพิมพ์ใหญ่) โดยใช้เมธอด tee() ซึ่งจะล็อกสตรีม นั่นคือทำให้ใช้สตรีมโดยตรงไม่ได้อีกต่อไป แต่ระบบจะสร้างสตรีมใหม่ 2 รายการที่เรียกว่าสาขา ซึ่งจะใช้แยกกันได้ การเริ่มสตรีมก็สำคัญเช่นกันเนื่องจากสตรีมจะกรอกลับหรือเริ่มใหม่ไม่ได้ เราจะอธิบายเรื่องนี้เพิ่มเติมในภายหลัง

แผนภาพเชนไปป์ประกอบด้วยสตรีมที่อ่านได้ซึ่งมาจากการเรียกใช้ fetch API จากนั้นส่งผ่านสตรีมการเปลี่ยนรูปแบบที่มีเอาต์พุตแยกออกมา แล้วส่งไปยังเบราว์เซอร์สำหรับสตรีมที่อ่านได้ซึ่งแสดงผลเป็นอันดับแรก และส่งไปยังแคช Service Worker สำหรับสตรีมที่อ่านได้ซึ่งแสดงผลเป็นอันดับที่ 2
เชนท่อ

กลไกของสตรีมที่อ่านได้

สตรีมที่อ่านได้คือแหล่งข้อมูลที่แสดงใน JavaScript โดยออบเจ็กต์ ReadableStream ที่มาจากแหล่งที่มาพื้นฐาน ตัวสร้างของ ReadableStream() จะสร้างและแสดงผลออบเจ็กต์สตรีมที่อ่านได้จากตัวแฮนเดิลที่ระบุ แหล่งที่มาพื้นฐานมี 2 ประเภท ได้แก่

  • แหล่งข้อมูล Push จะส่งข้อมูลให้คุณอย่างต่อเนื่องเมื่อคุณเข้าถึงแหล่งข้อมูลดังกล่าว และคุณเป็นผู้เลือกว่าจะเริ่ม หยุดชั่วคราว หรือยกเลิกการเข้าถึงสตรีม เช่น สตรีมวิดีโอสด เหตุการณ์ที่เซิร์ฟเวอร์ส่ง หรือ WebSocket
  • แหล่งที่มาแบบพุลกำหนดให้คุณต้องขอข้อมูลจากแหล่งที่มาอย่างชัดเจนเมื่อเชื่อมต่อแล้ว ตัวอย่าง ได้แก่ การดำเนินการ HTTP ผ่านการเรียก fetch() หรือ XMLHttpRequest

ระบบจะอ่านข้อมูลสตรีมตามลําดับเป็นชิ้นเล็กๆ ที่เรียกว่าข้อมูลส่วน ข้อมูลโค้ดที่วางในสตรีมจะเรียกว่าจัดคิว ซึ่งหมายความว่าคำสั่งซื้อดังกล่าวกำลังรออยู่ในคิวพร้อมที่จะได้รับการอ่าน คิวภายในจะติดตามข้อมูลส่วนที่ยังไม่ได้อ่าน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กําหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบขนาดทั้งหมดของกลุ่มทั้งหมดในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าจำนวนสูงสุด

โปรแกรมอ่านจะอ่านข้อมูลในสตริง เครื่องอ่านนี้จะดึงข้อมูลทีละกลุ่ม ซึ่งช่วยให้คุณดำเนินการใดก็ได้กับข้อมูล เครื่องอ่านและรหัสการประมวลผลอื่นๆ ที่มาพร้อมกับเครื่องอ่านเรียกว่าผู้บริโภค

คอนสตรัคต์ถัดไปในบริบทนี้เรียกว่า คอนโทรลเลอร์ สตรีมที่อ่านได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงกัน ซึ่งช่วยให้คุณควบคุมสตรีมได้ดังชื่อที่ระบุ

มีเพียงผู้อ่านเพียงคนเดียวเท่านั้นที่อ่านสตรีมได้พร้อมกัน เมื่อสร้างผู้อ่านและเริ่มอ่านสตรีม (นั่นคือ กลายเป็นผู้อ่านที่ใช้งานอยู่) ระบบจะล็อกผู้อ่านดังกล่าวไว้กับสตรีมนั้น หากต้องการให้ผู้อ่านคนอื่นอ่านสตรีมต่อ โดยทั่วไปคุณต้องปล่อยผู้อ่านคนแรกก่อนดำเนินการอื่นๆ (แต่คุณส่งต่อสตรีมได้)

การสร้างสตรีมที่อ่านได้

คุณสร้างสตรีมที่อ่านได้โดยการเรียกคอนสตรัคเตอร์ของมัน ReadableStream() ตัวสร้างคอนสตรัคเตอร์มีพารามิเตอร์ underlyingSource ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทํางานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSource

ซึ่งจะใช้วิธีการต่อไปนี้ที่นักพัฒนาแอปกำหนดไว้ (ไม่บังคับ)

  • start(controller): เรียกใช้ทันทีเมื่อมีการสร้างออบเจ็กต์ วิธีการนี้สามารถเข้าถึงแหล่งที่มาของสตรีมและทำสิ่งอื่นๆ ที่จำเป็นในการตั้งค่าฟังก์ชันการทำงานของสตรีม หากต้องดำเนินการนี้แบบไม่พร้อมกัน เมธอดจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือไม่สำเร็จ พารามิเตอร์ controller ที่ส่งไปยังเมธอดนี้คือ a ReadableStreamDefaultController
  • pull(controller): ใช้เพื่อควบคุมสตรีมเมื่อมีการดึงข้อมูลกลุ่มเพิ่มเติม ระบบจะเรียกใช้ซ้ำๆ ตราบใดที่คิวภายในของกลุ่มของข้อมูลสตรีมยังไม่เต็ม จนกว่าคิวจะถึงจุดสูงสุด หากผลลัพธ์ของการเรียก pull() เป็นสัญญา ระบบจะไม่เรียก pull() อีกจนกว่าสัญญาดังกล่าวจะสำเร็จ หาก Promise ปฏิเสธ สตรีมจะแสดงข้อผิดพลาด
  • cancel(reason): เรียกใช้เมื่อผู้บริโภคสตรีมยกเลิกสตรีม
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController รองรับวิธีการต่อไปนี้

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ ReadableStream() ซึ่งไม่บังคับเช่นกันคือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์ 2 รายการดังนี้

  • highWaterMark: ตัวเลขที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คำนวณและแสดงผลขนาดที่ไม่เป็นลบแบบจำกัดของค่าข้อมูลโค้ดที่ระบุ ระบบจะใช้ผลลัพธ์นี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableStreamDefaultController.desiredSize ที่เหมาะสม และยังควบคุมเมื่อมีการคําเรียกเมธอด pull() ของแหล่งที่มาพื้นฐานด้วย
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() และread()

หากต้องการอ่านจากสตรีมที่อ่านได้ คุณต้องมีโปรแกรมอ่าน ซึ่งจะเป็น ReadableStreamDefaultReader เมธอด getReader() ของอินเทอร์เฟซ ReadableStream จะสร้างโปรแกรมอ่านและล็อกสตรีมไว้กับโปรแกรมอ่านนั้น ขณะที่สตรีมล็อกอยู่ คุณจะไม่สามารถรับสิทธิ์อ่านจากเครื่องอ่านเครื่องอื่นได้จนกว่าจะปล่อยเครื่องอ่านนี้

เมธอด read() ของอินเทอร์เฟซ ReadableStreamDefaultReader จะแสดงผลพรอมต์ที่ให้สิทธิ์เข้าถึงข้อมูลส่วนถัดไปในคิวภายในของสตรีม โดยจะยอมรับหรือปฏิเสธโดยขึ้นอยู่กับสถานะของสตรีม โอกาสต่างๆ มีดังนี้

  • หากมีข้อมูลโค้ดพร้อมใช้งาน ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ของรูปแบบ
    { value: chunk, done: false }
  • หากสตรีมปิดอยู่ ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ของรูปแบบ
    { value: undefined, done: true }
  • หากสตรีมเกิดข้อผิดพลาด ระบบจะปฏิเสธการสัญญาพร้อมข้อผิดพลาดที่เกี่ยวข้อง
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่อ่านได้ถูกล็อกหรือไม่โดยไปที่พร็อพเพอร์ตี้ของ ReadableStream.locked สตรีมนั้น

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมที่อ่านได้

โค้ดตัวอย่างด้านล่างแสดงขั้นตอนทั้งหมดที่ใช้งาน ก่อนอื่นให้สร้าง ReadableStream ที่กําหนดวิธีการ start() ในอาร์กิวเมนต์ underlyingSource (นั่นคือคลาส TimestampSource) วิธีนี้จะบอก controller ของสตรีมให้ประทับเวลาทุกวินาทีในช่วง 10 วินาทีenqueue() สุดท้าย อุปกรณ์จะบอกให้ตัวควบคุมclose()สตรีม คุณใช้สตรีมนี้ได้โดยสร้างโปรแกรมอ่านผ่านเมธอด getReader() และเรียกใช้ read() จนกว่าสตรีมจะdone

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

การทำซ้ำแบบอะซิงโครนัส

การตรวจสอบว่าสตรีมเป็น done หรือไม่ในแต่ละรอบของ read() อาจไม่ใช่ API ที่สะดวกที่สุด แต่โชคดีที่จะมีวิธีที่ดีกว่านี้ในเร็วๆ นี้ ซึ่งก็คือการทำซ้ำแบบไม่พร้อมกัน

for await (const chunk of stream) {
  console.log(chunk);
}
การวนซ้ำแบบไม่พร้อมกัน

วิธีแก้ปัญหาชั่วคราวในการใช้การทำซ้ำแบบอะซิงโครนัสในปัจจุบันคือการใช้ลักษณะการทำงานด้วย polyfill

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

การสร้างสตรีมที่อ่านได้

เมธอด tee() ของอินเทอร์เฟซ ReadableStream จะแยกสตรีมที่อ่านได้ปัจจุบันออก โดยจะแสดงผลอาร์เรย์ 2 องค์ประกอบที่มีสาขาที่เป็นผลลัพธ์ 2 รายการเป็นอินสแตนซ์ ReadableStream ใหม่ ซึ่งจะช่วยให้ผู้อ่าน 2 คนอ่านสตรีมพร้อมกันได้ คุณอาจทำเช่นนี้ใน Service Worker ในกรณีที่ต้องการดึงข้อมูลการตอบกลับจากเซิร์ฟเวอร์และสตรีมไปยังเบราว์เซอร์ รวมถึงสตรีมไปยังแคช Service Worker ด้วย เนื่องจากใช้เนื้อหาการตอบกลับได้เพียงครั้งเดียว คุณจึงต้องใช้สำเนา 2 รายการเพื่อทำเช่นนี้ หากต้องการยกเลิกสตรีม คุณต้องยกเลิกทั้ง 2 สาขาที่เกิดขึ้น โดยทั่วไปแล้ว การเปิดใช้สตรีมจะล็อกสตรีมนั้นไว้ตลอดระยะเวลาที่เปิดใช้ ซึ่งจะป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีมได้

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

สตรีมไบต์ที่อ่านได้

สําหรับสตรีมที่แสดงไบต์ ระบบจะจัดเตรียมสตรีมที่อ่านได้เวอร์ชันขยายเพื่อจัดการไบต์อย่างมีประสิทธิภาพ โดยเฉพาะอย่างยิ่งด้วยการลดการคัดลอก สตรีมไบต์ช่วยให้สามารถรับผู้อ่านแบบนำบัฟเฟอร์ของคุณเองมา (BYOB) การใช้งานเริ่มต้นจะให้เอาต์พุตที่หลากหลาย เช่น สตริงหรือบัฟเฟอร์อาร์เรย์ในกรณีของ WebSockets ส่วนสตรีมไบต์จะรับประกันเอาต์พุตไบต์ นอกจากนี้ ผู้อ่าน BYOB ยังได้รับสิทธิประโยชน์ด้านความเสถียรด้วย เนื่องจากหากบัฟเฟอร์แยกออก การดำเนินการนี้จะรับประกันว่าไม่มีการเขียนลงในบัฟเฟอร์เดียวกัน 2 ครั้ง จึงหลีกเลี่ยงเงื่อนไขการแข่งขันได้ เครื่องอ่าน BYOB สามารถลดจำนวนครั้งที่เบราว์เซอร์ต้องเรียกใช้การเก็บขยะได้ เนื่องจากสามารถนําบัฟเฟอร์มาใช้ซ้ำได้

การสร้างสตรีมไบต์ที่อ่านได้

คุณสามารถสร้างสตรีมไบต์ที่อ่านได้โดยการส่งพารามิเตอร์ type เพิ่มเติมไปยังคอนสตรคเตอร์ ReadableStream()

new ReadableStream({ type: 'bytes' });

underlyingSource

แหล่งที่มาของไบต์สตรีมที่อ่านได้จะได้รับ ReadableByteStreamController เพื่อใช้ดำเนินการ เมธอด ReadableByteStreamController.enqueue() ของคลาสนี้ใช้อาร์กิวเมนต์ chunk ที่มีค่าเป็น ArrayBufferView พร็อพเพอร์ตี้ ReadableByteStreamController.byobRequest จะแสดงคำขอดึง BYOB ในปัจจุบัน หรือแสดงค่าว่างหากไม่มี สุดท้าย พร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize จะแสดงผลขนาดที่ต้องการเพื่อเติมคิวภายในของสตรีมที่ควบคุม

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ ReadableStream() ซึ่งไม่บังคับเช่นกันคือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์เดียว ดังนี้

  • highWaterMark: จำนวนไบต์ที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้ ข้อมูลนี้ใช้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize ที่เหมาะสม และยังควบคุมเมื่อมีการคําเรียกเมธอด pull() ของแหล่งที่มาพื้นฐานด้วย

getReader() และread()

จากนั้นคุณจะได้รับสิทธิ์เข้าถึง ReadableStreamBYOBReader โดยการตั้งค่าพารามิเตอร์ mode ดังนี้ ReadableStream.getReader({ mode: "byob" }) วิธีนี้ช่วยให้ควบคุมการจัดสรรบัฟเฟอร์ได้แม่นยำยิ่งขึ้นเพื่อหลีกเลี่ยงการคัดลอก หากต้องการอ่านจากสตรีมไบต์ คุณต้องเรียกใช้ ReadableStreamBYOBReader.read(view) โดยที่ view คือ ArrayBufferView

ตัวอย่างโค้ดสตรีมไบต์ที่อ่านได้

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

ฟังก์ชันต่อไปนี้จะแสดงผลสตรีมไบต์ที่อ่านได้ ซึ่งช่วยให้อ่านอาร์เรย์ที่สร้างขึ้นแบบสุ่มแบบไม่คัดลอกได้อย่างมีประสิทธิภาพ แทนที่จะใช้ขนาดข้อมูลขนาด 1,024 ที่กําหนดไว้ล่วงหน้า ระบบจะพยายามกรอกข้อมูลในบัฟเฟอร์ที่นักพัฒนาแอประบุ ซึ่งช่วยให้ควบคุมได้อย่างเต็มที่

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

กลไกของสตรีมที่เขียนได้

สตรีมที่เขียนได้คือปลายทางที่คุณเขียนข้อมูลได้ ซึ่งแสดงใน JavaScript โดยออบเจ็กต์ WritableStream ซึ่งทำหน้าที่เป็นแอบสแตรกชันเหนือซิงค์ที่อยู่เบื้องหลัง ซึ่งเป็นซิงค์ I/O ระดับล่างที่เขียนข้อมูลดิบ

ระบบจะเขียนข้อมูลลงในสตรีมผ่านโปรแกรมเขียนทีละกลุ่ม ข้อมูลโค้ดมีได้หลายรูปแบบ เช่นเดียวกับข้อมูลโค้ดในโปรแกรมอ่าน คุณใช้โค้ดใดก็ได้เพื่อสร้างข้อมูลที่จะเขียนได้ โดยโปรแกรมเขียนโค้ดและโค้ดที่เกี่ยวข้องเรียกว่าโปรแกรมสร้าง

เมื่อสร้างผู้เขียนและเริ่มเขียนลงในสตรีม (ผู้เขียนที่ทำงานอยู่) ระบบจะถือว่าผู้เขียนล็อกอยู่กับสตรีมนั้น มีผู้เขียนได้เพียงคนเดียวเท่านั้นที่เขียนลงในสตรีมแบบเขียนได้พร้อมกัน หากต้องการให้ผู้เขียนคนอื่นๆ เริ่มเขียนในสตรีมของคุณ โดยทั่วไปคุณจะต้องเผยแพร่สตรีมก่อน จากนั้นจึงจะแนบผู้เขียนคนอื่นๆ ในสตรีมได้

คิวภายในจะติดตามข้อมูลส่วนที่เขียนลงในสตรีมแล้วแต่ยังไม่ได้ประมวลผลโดยซิงค์พื้นฐาน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กําหนดวิธีที่สตรีมควรส่งสัญญาณแรงดันย้อนกลับตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบขนาดทั้งหมดของกลุ่มทั้งหมดในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าจำนวนสูงสุด

โครงสร้างสุดท้ายเรียกว่าตัวควบคุม สตรีมแบบเขียนได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงซึ่งช่วยให้คุณควบคุมสตรีมได้ (เช่น เพื่อหยุดกลางคัน)

การสร้างสตรีมที่เขียนได้

อินเทอร์เฟซ WritableStream ของ Streams API ให้การแยกแยะมาตรฐานสำหรับการเขียนข้อมูลสตรีมไปยังปลายทาง ซึ่งเรียกว่าซิงค์ ออบเจ็กต์นี้มีแรงดันย้อนกลับและการจัดคิวในตัว คุณสร้างสตรีมแบบเขียนได้โดยเรียกคอนสตรัคเตอร์ของ WritableStream() โดยมีพารามิเตอร์ underlyingSink ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทํางานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSink

underlyingSink สามารถรวมวิธีการที่ไม่บังคับซึ่งนักพัฒนาแอปกำหนดไว้ดังต่อไปนี้ พารามิเตอร์ controller ที่ส่งไปยังเมธอดบางรายการคือ WritableStreamDefaultController

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ เนื้อหาของเมธอดนี้ควรมีจุดมุ่งหมายเพื่อเข้าถึงซิงค์ที่อยู่เบื้องหลัง หากต้องการดำเนินการนี้แบบไม่พร้อมกัน การดำเนินการจะแสดงผลลัพธ์สำเร็จหรือล้มเหลว
  • write(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อข้อมูลกลุ่มใหม่ (ที่ระบุไว้ในพารามิเตอร์ chunk) พร้อมที่จะเขียนลงในซิงค์ที่อยู่เบื้องหลัง การดำเนินการนี้จะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนก่อนหน้านี้สำเร็จเท่านั้น และจะไม่เรียกใช้หลังจากที่สตรีมปิดหรือยกเลิก
  • close(controller): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าเขียนข้อมูลไปยังสตรีมเสร็จแล้ว เนื้อหาควรทําทุกอย่างที่จําเป็นเพื่อเขียนข้อมูลไปยังที่เก็บข้อมูลที่อยู่เบื้องหลังให้เสร็จสมบูรณ์ และยกเลิกสิทธิ์เข้าถึง หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนทั้งหมดที่อยู่ในคิวดำเนินการเสร็จสมบูรณ์แล้วเท่านั้น
  • abort(reason): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าต้องการปิดสตรีมอย่างกะทันหันและทำให้สตรีมอยู่ในสถานะที่มีข้อผิดพลาด การดำเนินการนี้จะล้างทรัพยากรที่ถืออยู่ได้เช่นเดียวกับ close() แต่ระบบจะเรียก abort() แม้จะจัดคิวการเขียนไว้แล้วก็ตาม ระบบจะทิ้งข้อมูลเหล่านั้น หากกระบวนการนี้เป็นแบบไม่พร้อมกัน ระบบจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว พารามิเตอร์ reason มี DOMString ที่อธิบายสาเหตุที่หยุดสตรีม
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

อินเทอร์เฟซของ Streams API WritableStreamDefaultController แสดงตัวควบคุมที่อนุญาตให้ควบคุมสถานะของ WritableStream ในระหว่างการตั้งค่า เมื่อส่งข้อมูลจำนวนมากขึ้นสำหรับการเขียน หรือเมื่อสิ้นสุดการเขียน เมื่อสร้าง WritableStream ไปป์ไลน์ที่ฝังอยู่จะได้รับอินสแตนซ์ WritableStreamDefaultController ที่เกี่ยวข้องเพื่อดำเนินการ WritableStreamDefaultController มีเพียงเมธอดเดียวเท่านั้น ซึ่งก็คือ WritableStreamDefaultController.error() ซึ่งทําให้การทำงานกับสตรีมที่เกี่ยวข้องในอนาคตเกิดข้อผิดพลาด WritableStreamDefaultController ยังรองรับพร็อพเพอร์ตี้ signal ซึ่งจะแสดงผลอินสแตนซ์ของ AbortSignal ซึ่งช่วยให้หยุดการดำเนินการ WritableStream ได้หากจำเป็น

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ของคอนสตรัคเตอร์ WritableStream() ซึ่งไม่บังคับเช่นเดียวกันคือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสําหรับสตรีม (ไม่บังคับ) ซึ่งใช้พารามิเตอร์ 2 รายการดังนี้

  • highWaterMark: ตัวเลขที่ไม่ใช่ค่าลบซึ่งระบุจำนวนสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คำนวณและแสดงผลขนาดที่ไม่เป็นลบแบบจำกัดของค่าข้อมูลโค้ดที่ระบุ ระบบจะใช้ผลลัพธ์นี้เพื่อกำหนดแรงดันย้อนกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ WritableStreamDefaultWriter.desiredSize ที่เหมาะสม

getWriter() และwrite()

หากต้องการเขียนลงในสตรีมที่เขียนได้ คุณต้องมีโปรแกรมเขียน ซึ่งจะเป็น WritableStreamDefaultWriter เมธอด getWriter() ของอินเทอร์เฟซ WritableStream จะแสดงผลอินสแตนซ์ใหม่ของ WritableStreamDefaultWriter และล็อกสตรีมไปยังอินสแตนซ์นั้น ขณะที่สตรีมถูกล็อก คุณจะไม่สามารถรับผู้เขียนคนใหม่ได้จนกว่าผู้เขียนคนปัจจุบันจะได้รับการปล่อยตัว

เมธอด write() ของอินเตอร์เฟซ WritableStreamDefaultWriter จะเขียนข้อมูลกลุ่มที่ส่งไปยัง WritableStream และซิงค์ที่อยู่เบื้องหลัง จากนั้นจะแสดงผลลัพธ์เป็นสัญญาว่าจะดำเนินการเพื่อระบุความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน โปรดทราบว่าความหมายของ "สำเร็จ" ขึ้นอยู่กับซิงค์ที่อยู่เบื้องหลัง ซึ่งอาจบ่งบอกว่าระบบยอมรับข้อมูลดังกล่าวแล้ว แต่ไม่จําเป็นว่าข้อมูลดังกล่าวจะได้รับการบันทึกอย่างปลอดภัยไปยังปลายทาง

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่ใช้เขียนได้ถูกล็อกหรือไม่โดยไปที่พร็อพเพอร์ตี้ WritableStream.locked ของสตรีมนั้น

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมแบบเขียนได้

โค้ดตัวอย่างด้านล่างแสดงขั้นตอนทั้งหมดที่ใช้งาน

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

การส่งผ่านสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้

สตรีมที่อ่านได้สามารถส่งผ่านไปยังสตรีมที่เขียนได้ผ่านเมธอด pipeTo() ของสตรีมที่อ่านได้ ReadableStream.pipeTo() จะส่งผ่าน ReadableStream ในปัจจุบันไปยัง WritableStream ที่ระบุและแสดงผลลัพธ์เป็น Promise ที่ดำเนินการเมื่อกระบวนการส่งผ่านเสร็จสมบูรณ์ หรือปฏิเสธหากพบข้อผิดพลาด

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

การสร้างสตรีมการเปลี่ยนรูปแบบ

อินเทอร์เฟซ TransformStream ของ Streams API แสดงชุดข้อมูลที่เปลี่ยนรูปแบบได้ คุณสร้างสตรีมการเปลี่ยนรูปแบบได้โดยเรียกคอนสตรัคเตอร์ TransformStream() ซึ่งจะสร้างและแสดงผลออบเจ็กต์สตรีมการเปลี่ยนรูปแบบจากตัวแฮนเดิลที่ระบุ ตัวสร้าง TransformStream() จะยอมรับออบเจ็กต์ JavaScript ที่ไม่บังคับซึ่งแสดงถึง transformer เป็นอาร์กิวเมนต์แรก ออบเจ็กต์ดังกล่าวอาจมีเมธอดใดก็ได้ต่อไปนี้

transformer

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มคำนำหน้าโดยใช้ controller.enqueue() ระบบจะอ่านข้อมูลดังกล่าวจากฝั่งที่อ่านได้ แต่ไม่ขึ้นอยู่กับการเขียนไปยังฝั่งที่เขียนได้ หากกระบวนการเริ่มต้นนี้เป็นแบบไม่พร้อมกัน เช่น เนื่องจากต้องใช้เวลาในการรับข้อมูลโค้ดนำหน้า ฟังก์ชันจะแสดงผลพรอมต์เพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวได้ โดยพรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดในสตรีม ตัวสร้าง TransformStream() จะโยนข้อยกเว้นที่โยน
  • transform(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อข้อมูลใหม่ซึ่งเขียนไปยังฝั่งที่เขียนได้พร้อมที่จะเปลี่ยนรูปแบบแล้ว การใช้งานสตรีมจะรับประกันว่าฟังก์ชันนี้จะเรียกใช้หลังจากที่การแปลงก่อนหน้าสําเร็จเท่านั้น และจะไม่เรียกใช้ก่อน start() เสร็จสมบูรณ์หรือหลังจากเรียกใช้ flush() ฟังก์ชันนี้จะทํางานเปลี่ยนรูปแบบจริงของสตรีมการเปลี่ยนรูปแบบ โดยสามารถจัดคิวผลลัพธ์ได้โดยใช้ controller.enqueue() ซึ่งอนุญาตให้เขียนข้อมูลไปยังฝั่งที่เขียนได้เพียงครั้งเดียว แต่ฝั่งที่อ่านได้อาจมีข้อมูลหลายกลุ่มหรือไม่มีเลย ทั้งนี้ขึ้นอยู่กับจำนวนครั้งที่เรียก controller.enqueue() หากกระบวนการเปลี่ยนรูปแบบเป็นแบบไม่พร้อมกัน ฟังก์ชันนี้จะแสดงผลพรอมต์เพื่อบ่งบอกความสําเร็จหรือความล้มเหลวของการเปลี่ยนรูปแบบ พรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งฝั่งที่อ่านได้และเขียนได้ของสตรีมการเปลี่ยนรูปแบบ หากไม่ได้ระบุเมธอด transform() ระบบจะใช้การเปลี่ยนรูปแบบข้อมูลประจำตัว ซึ่งจะจัดคิวข้อมูลส่วนที่ไม่มีการแก้ไขจากฝั่งที่เขียนได้ไปยังฝั่งที่อ่านได้
  • flush(controller): ระบบจะเรียกใช้เมธอดนี้หลังจากที่เปลี่ยนรูปแบบข้อมูลทั้งหมดที่เขียนลงในฝั่งที่เขียนได้โดยการผ่าน transform() เรียบร้อยแล้ว และฝั่งที่เขียนได้กำลังจะปิด โดยปกติแล้วจะใช้เพื่อจัดคิวกลุ่มส่วนต่อท้ายไปยังฝั่งที่อ่านได้ ก่อนที่ฝั่งนั้นจะปิดลงด้วย หากการล้างข้อมูลเป็นแบบไม่เป็นเชิงเวลา ฟังก์ชันจะแสดงผลลัพธ์เป็นสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลว และระบบจะสื่อสารผลลัพธ์ไปยังผู้เรียกใช้ stream.writable.write() นอกจากนี้ พรอมต์ที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งฝั่งที่อ่านได้และเขียนได้ของสตรีม การยกเว้นข้อยกเว้นจะถือว่าเหมือนกับการคืนค่า Promise ที่ปฏิเสธ
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

กลยุทธ์การจัดคิว writableStrategy และ readableStrategy

พารามิเตอร์ที่ไม่บังคับลำดับที่ 2 และ 3 ของคอนสตรคเตอร์ TransformStream() จะใช้หรือไม่ก็ได้ writableStrategy และกลยุทธ์การจัดคิว readableStrategy โดยมีการกําหนดไว้ดังที่ระบุไว้ในส่วนสตรีมที่อ่านได้และที่เขียนได้ตามลําดับ

ตัวอย่างโค้ดสตรีมการเปลี่ยนรูปแบบ

ตัวอย่างโค้ดต่อไปนี้แสดงสตรีมการเปลี่ยนรูปแบบแบบง่ายที่ทำงานอยู่

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

การส่งผ่านสตรีมที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบ

เมธอด pipeThrough() ของอินเทอร์เฟซ ReadableStream มีวิธีต่อเชื่อมสตรีมปัจจุบันผ่านสตรีมการเปลี่ยนรูปแบบหรือคู่อื่นๆ ที่เขียนได้/อ่านได้ โดยทั่วไปแล้ว การส่งผ่านสตรีมจะล็อกสตรีมนั้นไว้ตลอดระยะเวลาการส่งผ่าน ซึ่งจะป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีม

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

ตัวอย่างโค้ดถัดไป (ค่อนข้างซับซ้อน) แสดงวิธีใช้ fetch() เวอร์ชัน "ตะโกน" ซึ่งเปลี่ยนข้อความทั้งหมดเป็นตัวพิมพ์ใหญ่โดยใช้ Promise การตอบกลับที่แสดงผลเป็นสตรีม และเปลี่ยนเป็นตัวพิมพ์ใหญ่ทีละกลุ่ม ข้อดีของวิธีการนี้คือคุณไม่จําเป็นต้องรอให้ดาวน์โหลดเอกสารทั้งฉบับ ซึ่งจะทําให้เกิดความแตกต่างอย่างมากเมื่อจัดการกับไฟล์ขนาดใหญ่

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

สาธิต

ตัวอย่างด้านล่างแสดงสตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบ รวมถึงมีตัวอย่างโซ่ท่อ pipeThrough() และ pipeTo() รวมถึงสาธิต tee() ด้วย คุณสามารถเลือกที่จะเรียกใช้เดโมในหน้าต่างของตัวเองหรือดูซอร์สโค้ดก็ได้

สตรีมมีประโยชน์ที่พร้อมใช้งานในเบราว์เซอร์

เบราว์เซอร์มีสตรีมที่มีประโยชน์หลายรายการที่ฝังไว้ คุณสามารถสร้าง ReadableStream จาก Blob ได้อย่างง่ายดาย วิธีการ stream() ของอินเทอร์เฟซ Blob จะแสดงผล ReadableStream ซึ่งจะแสดงข้อมูลที่อยู่ใน Blob เมื่ออ่าน นอกจากนี้ โปรดทราบว่าออบเจ็กต์ File เป็น Blob ประเภทหนึ่งๆ และสามารถใช้ในบริบทใดก็ได้ที่ Blob ใช้ได้

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

ตัวแปรสตรีมมิงของ TextDecoder.decode() และ TextEncoder.encode() จะเรียกว่า TextDecoderStream และ TextEncoderStream ตามลำดับ

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

การบีบอัดหรือคลายการบีบอัดไฟล์นั้นทำได้ง่ายโดยใช้สตรีมการเปลี่ยนรูปแบบ CompressionStream และ DecompressionStream ตามลำดับ ตัวอย่างโค้ดด้านล่างแสดงวิธีดาวน์โหลดข้อกำหนดของ Streams, บีบอัด (gzip) ไฟล์ในเบราว์เซอร์โดยตรง และเขียนไฟล์ที่บีบอัดไปยังดิสก์โดยตรง

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream ของ File System Access API และfetch()สตรีมคำขอเวอร์ชันทดลองเป็นตัวอย่างของสตรีมที่เขียนได้ในโลกแห่งความเป็นจริง

Serial API ใช้สตรีมที่อ่านและเขียนได้เป็นอย่างมาก

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

สุดท้าย WebSocketStream API จะผสานรวมสตรีมกับ WebSocket API

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

แหล่งข้อมูลที่มีประโยชน์

ขอขอบคุณ

บทความนี้ได้รับการตรวจสอบโดย Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley และ Adam Rice บล็อกโพสต์ของ Jake Archibald ช่วยฉันได้มากในการทําความเข้าใจสตรีม ตัวอย่างโค้ดบางส่วนได้รับแรงบันดาลใจจากการสํารวจของผู้ใช้ GitHub @bellbind และบางส่วนของข้อความเขียนขึ้นโดยอิงตามเอกสารประกอบบนเว็บ MDN สําหรับสตรีม ผู้เขียนมาตรฐานสตรีมทํางานได้อย่างยอดเยี่ยมในการเขียนข้อกําหนดนี้ รูปภาพหลักโดย Ryan Lara จาก Unsplash