สตรีม—คู่มือฉบับสมบูรณ์

ดูวิธีใช้สตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบสตรีมด้วย Streams API

Streams API ช่วยให้คุณเข้าถึงสตรีมข้อมูลที่ได้รับผ่านเครือข่ายแบบเป็นโปรแกรมหรือสร้างขึ้นด้วยวิธีใดก็ตามภายในเครื่องและประมวลผลด้วย JavaScript ได้ สตรีมมิงคือการแบ่งทรัพยากรที่คุณต้องการรับ ส่ง หรือเปลี่ยนรูปแบบให้เป็นส่วนเล็กๆ แล้วประมวลผลทีละส่วน แม้ว่าสตรีมมิงจะเป็นสิ่งที่เบราว์เซอร์ทำอยู่แล้วเมื่อได้รับเนื้อหาอย่าง HTML หรือวิดีโอเพื่อแสดงในหน้าเว็บ แต่ความสามารถนี้ไม่เคยพร้อมใช้งานกับ JavaScript มาก่อนที่ fetch ซึ่งมีการเริ่มใช้สตรีมในปี 2015

ก่อนหน้านี้ หากคุณต้องการประมวลผลทรัพยากรบางประเภท (เช่น วิดีโอหรือไฟล์ข้อความ ฯลฯ) คุณต้องดาวน์โหลดทั้งไฟล์ รอให้ระบบดีซีเรียลไลซ์ให้อยู่ในรูปแบบที่เหมาะสม จากนั้นประมวลผล การที่สตรีมใช้งานกับ JavaScript ได้ การเปลี่ยนแปลงทั้งหมดนี้จะเปลี่ยนแปลงไป ตอนนี้คุณประมวลผลข้อมูลดิบด้วย JavaScript ได้อย่างต่อเนื่องทันทีที่พร้อมใช้งานในไคลเอ็นต์ โดยไม่ต้องสร้างบัฟเฟอร์ สตริง หรือ BLOB ซึ่งช่วยปลดล็อก Use Case ได้จำนวนหนึ่ง ซึ่งตัวอย่างที่เราระบุไว้ด้านล่างมีดังนี้

  • เอฟเฟกต์วิดีโอ: เชื่อมต่อสตรีมวิดีโอที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบซึ่งนำเอฟเฟกต์ไปใช้แบบเรียลไทม์
  • การบีบอัดข้อมูล (de): การเชื่อมต่อสตรีมไฟล์ผ่านสตรีมการแปลงที่เลือก (de) บีบอัดสตรีม
  • การถอดรหัสรูปภาพ: การเชื่อมสตรีมการตอบสนอง HTTP ผ่านสตรีมการแปลงที่ถอดรหัสไบต์เป็นข้อมูลบิตแมป จากนั้นผ่านสตรีมการเปลี่ยนรูปแบบอื่นที่แปลบิตแมปเป็น PNG หากติดตั้งไว้ในตัวแฮนเดิล fetch ของ Service Worker จะช่วยให้คุณสร้าง Polyfill รูปแบบรูปภาพใหม่อย่าง AVIF ได้แบบโปร่งใส

การสนับสนุนเบราว์เซอร์

ReadableStream และ WritableStream

การสนับสนุนเบราว์เซอร์

  • 43
  • 14
  • 65
  • 10.1

แหล่งที่มา

TransformStream

การสนับสนุนเบราว์เซอร์

  • 67
  • 79
  • 102
  • 14.1

แหล่งที่มา

แนวคิดหลัก

ก่อนที่จะเจาะลึกรายละเอียดของสตรีมประเภทต่างๆ ผมขอแนะนำแนวคิดหลักไปก่อน

แบบเปียก

กลุ่มข้อมูลคือข้อมูลชิ้นเดียวที่เขียนไปยังหรืออ่านจากสตรีม เป็นสตรีมประเภทใดก็ได้ หรืออาจมีสตรีมหลายประเภท โดยส่วนใหญ่แล้ว กลุ่มข้อมูลจะไม่ใช่หน่วยอะตอมของข้อมูลของสตรีมหนึ่งๆ เช่น สตรีมแบบไบต์อาจมีชิ้นส่วนที่ประกอบด้วย 16 KiB Uint8Array หน่วย แทนที่จะเป็นไบต์เดี่ยว

สตรีมที่อ่านได้

สตรีมที่อ่านได้จะแสดงถึงแหล่งข้อมูลที่อ่านได้ กล่าวคือ ข้อมูลมาจากสตรีมที่อ่านได้ แน่นอนว่าสตรีมที่อ่านได้คืออินสแตนซ์ของคลาส ReadableStream

สตรีมที่เขียนได้

สตรีมที่เขียนได้จะแสดงปลายทางของข้อมูลที่คุณสามารถเขียนได้ กล่าวคือ ข้อมูลจะเข้าสู่สตรีมที่เขียนได้ โดยเฉพาะอย่างยิ่ง สตรีมที่เขียนได้คืออินสแตนซ์ของคลาส WritableStream

เปลี่ยนรูปแบบสตรีม

สตรีมการเปลี่ยนรูปแบบประกอบด้วยสตรีม 1 คู่ ได้แก่ สตรีมที่เขียนได้ หรือที่เรียกว่าด้านที่เขียนได้ และสตรีมที่อ่านได้ หรือที่เรียกว่าด้านที่อ่านได้ คำเปรียบเทียบในโลกแห่งความเป็นจริงก็คือล่ามแบบพร้อมกันที่แปลจากภาษาหนึ่งเป็นอีกภาษาหนึ่งได้ทันที ในลักษณะที่เจาะจงสำหรับสตรีมการเปลี่ยนรูปแบบ การเขียนไปยังด้านที่เขียนได้จะทำให้มีข้อมูลใหม่สำหรับการอ่านจากด้านที่อ่านได้ ดังนั้น ออบเจ็กต์ใดก็ตามที่มีพร็อพเพอร์ตี้ writable และพร็อพเพอร์ตี้ readable จะทำหน้าที่เป็นสตรีมการเปลี่ยนรูปแบบได้ อย่างไรก็ตาม คลาส TransformStream มาตรฐานจะช่วยให้คุณสร้างการจับคู่ที่เชื่อมโยงกันอย่างเหมาะสมได้ง่ายขึ้น

โซ่ท่อ

สตรีมใช้โดยการเชื่อมต่อสตรีมเข้าด้วยกันเป็นหลัก สตรีมที่อ่านได้จะเชื่อมต่อไปยังสตรีมที่เขียนได้โดยตรงโดยใช้เมธอด pipeTo() ของสตรีมที่อ่านได้ หรืออาจเชื่อมต่อผ่านสตรีมการเปลี่ยนรูปแบบอย่างน้อย 1 รายการก่อน โดยใช้เมธอด pipeThrough() ของสตรีมที่อ่านได้ ชุดของสตรีมที่เชื่อมต่อกันในลักษณะนี้เรียกว่าเชนไปป์

แรงกดดัน

เมื่อสร้างห่วงโซ่ท่อ จะมีการถ่ายทอดสัญญาณว่าชิ้นส่วนควรไหลผ่านเร็วเพียงใด หากขั้นตอนใดก็ตามในห่วงโซ่ยังไม่สามารถรับชิ้นส่วนได้ ระบบจะถ่ายทอดสัญญาณถอยหลังผ่านโซ่ท่อ จนกระทั่งในท้ายที่สุดแหล่งที่มาเดิมจะได้รับคำสั่งให้หยุดการผลิตท่อนสั้นๆ อย่างรวดเร็ว กระบวนการของการทำให้โฟลว์ให้เป็นมาตรฐานเรียกว่า Backpressure

เสื้อยืด

สตรีมที่อ่านได้ (ตั้งชื่อตามรูปร่าง "T") ตัวพิมพ์ใหญ่) ได้โดยใช้เมธอด tee() การดำเนินการนี้จะล็อกสตรีม กล่าวคือทำให้ใช้สตรีมโดยตรงไม่ได้อีกต่อไป แต่จะสร้างสตรีมใหม่ 2 รายการที่เรียกว่า Branch ซึ่งสามารถใช้ได้อย่างอิสระ การตีลูกก็มีความสำคัญเช่นกันเนื่องจากระบบไม่สามารถกรอกลับหรือรีสตาร์ทสตรีมได้ ส่วนเพิ่มเติมเกี่ยวกับเรื่องนี้ในภายหลัง

แผนภาพของเชนไปป์ที่ประกอบด้วยสตรีมที่อ่านได้ซึ่งมาจากการเรียกไปยัง API การดึงข้อมูล จากนั้นส่งผ่านสตรีมการแปลงที่มีเอาต์พุต จากนั้นส่งไปยังเบราว์เซอร์สำหรับสตรีมที่อ่านได้รายการแรกและส่งไปยังแคชของโปรแกรมทำงานของบริการสำหรับสตรีมที่อ่านได้สตรีมที่ 2
โซ่ไปป์

กลไกของสตรีมที่อ่านได้

สตรีมที่อ่านได้คือแหล่งข้อมูลที่แสดงใน JavaScript โดยออบเจ็กต์ ReadableStream ที่ไหลมาจากแหล่งที่มาที่สำคัญ เครื่องมือสร้าง ReadableStream() จะสร้างและแสดงผลออบเจ็กต์สตรีมที่อ่านได้จากเครื่องจัดการที่ระบุ แหล่งที่มาที่สำคัญมี 2 ประเภท ได้แก่

  • พุชแหล่งที่มาจะพุชข้อมูลถึงคุณอย่างสม่ำเสมอเมื่อคุณเข้าถึงแหล่งข้อมูลเหล่านั้น และคุณสามารถเริ่ม หยุดชั่วคราว หรือยกเลิกการเข้าถึงสตรีมได้ ตัวอย่างเช่น สตรีมวิดีโอสด เหตุการณ์ที่เซิร์ฟเวอร์ส่ง หรือ WebSocket
  • แหล่งที่มาของการดึงกำหนดให้คุณต้องขอข้อมูลจากแหล่งที่มาดังกล่าวอย่างชัดแจ้งเมื่อเชื่อมต่อแล้ว ตัวอย่างเช่น การดำเนินการ HTTP ผ่านการเรียก fetch() หรือ XMLHttpRequest

ระบบจะอ่านข้อมูลของสตรีมตามลำดับเป็นส่วนเล็กๆ ที่เรียกว่าส่วน ส่วนเนื้อหาที่อยู่ในสตรีมจะต้องอยู่ในคิว ซึ่งหมายความว่าผู้อ่านกำลังรอ อยู่ในคิวเพื่อรออ่าน คิวภายในจะติดตามกลุ่มที่ยังไม่ได้อ่าน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กำหนดวิธีที่สตรีมควรส่งสัญญาณ Backpress ตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละส่วน และเปรียบเทียบขนาดของกลุ่มทั้งหมดในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าเครื่องหมายปริมาณน้ำสูง

ผู้อ่านจะอ่านเนื้อหาในสตรีมเป็นส่วนๆ เครื่องอ่านนี้จะรวบรวมข้อมูลครั้งละ 1 ส่วน ทำให้คุณทำงานในแบบใดก็ได้ที่ต้องการ เครื่องอ่านและโค้ดประมวลผลอื่นๆ ที่ทำงานไปพร้อมกับโค้ดจะเรียกว่าผู้บริโภค

โครงสร้างถัดไปในบริบทนี้เรียกว่าตัวควบคุม สตรีมที่อ่านได้แต่ละสตรีมจะมีตัวควบคุมที่เกี่ยวข้องซึ่งช่วยให้คุณควบคุมสตรีมได้ตามชื่อ

ในตอนหนึ่งๆ ผู้อ่านอ่านสตรีมได้ทีละ 1 คนเท่านั้น เมื่อสร้างผู้อ่านและเริ่มอ่านสตรีม (กล่าวคือจะกลายเป็นผู้อ่านที่ใช้งานอยู่) ระบบจะล็อกสตรีมไว้ หากต้องการให้ผู้อ่านคนอื่นอ่านสตรีมของคุณต่อได้ โดยทั่วไปแล้วคุณจะต้องปล่อยผู้อ่านคนแรกก่อนที่จะดำเนินการใดๆ เพิ่มเติม (แต่คุณสามารถทีสตรีมได้)

การสร้างสตรีมที่อ่านได้

คุณสร้างสตรีมที่อ่านได้โดยเรียกใช้ตัวสร้างของสตรีม ReadableStream() ตัวสร้างมีอาร์กิวเมนต์ underlyingSource ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์พร้อมด้วยเมธอดและพร็อพเพอร์ตี้ที่กำหนดลักษณะการทำงานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSource

วิธีนี้อาจใช้วิธีการที่นักพัฒนาแอปกำหนด (ไม่บังคับ) ดังต่อไปนี้

  • start(controller): เรียกทันทีเมื่อมีการสร้างวัตถุ เมธอดนี้จะเข้าถึงแหล่งที่มาของสตรีมและทำสิ่งอื่นๆ ที่จำเป็นในการตั้งค่าฟังก์ชันการทำงานของสตรีม หากการดำเนินการนี้ไม่พร้อมกัน เมธอดอาจแสดงผลสัญญาเป็นสัญญาณว่าประสบความสำเร็จหรือล้มเหลว พารามิเตอร์ controller ที่ส่งไปยังเมธอดนี้คือ ReadableStreamDefaultController
  • pull(controller): สามารถใช้ควบคุมสตรีมเมื่อมีการดึงส่วนเนื้อหาจำนวนมากขึ้น โดยจะมีการเรียกใช้ซ้ำๆ ตราบใดที่คิวภายในของสตรีมไม่เต็ม จนกระทั่งคิวมีลำดับน้ำสูงสุด หากผลของการโทรไปยัง pull() เป็นไปตามสัญญา ระบบจะไม่โทรหา pull() อีกจนกว่าข้อความดังกล่าวจะเป็นไปตามสัญญา หากคำสัญญาปฏิเสธ สตรีมจะเกิดข้อผิดพลาด
  • cancel(reason): เรียกใช้เมื่อผู้บริโภคของสตรีมยกเลิกสตรีม
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController รองรับเมธอดต่อไปนี้

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ในทํานองเดียวกัน อาร์กิวเมนต์ของตัวสร้าง ReadableStream() คือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสำหรับสตรีม (ไม่บังคับ) ซึ่งจะใช้พารามิเตอร์ 2 ตัวดังนี้

  • highWaterMark: ตัวเลขที่ไม่ติดลบซึ่งระบุค่าน้ำที่สูงของลำธารโดยใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คํานวณและแสดงผลขนาดที่ไม่ใช่ลบจํานวนจํากัดของค่ากลุ่มที่ระบุ ผลลัพธ์จะใช้เพื่อระบุความกดดันย้อนกลับ โดยแสดงผ่านพร็อพเพอร์ตี้ ReadableStreamDefaultController.desiredSize ที่เหมาะสม นอกจากนี้ ยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาที่สำคัญด้วย
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

เมธอด getReader() และ read()

หากต้องการอ่านจากสตรีมที่อ่านได้ คุณจะต้องมีโปรแกรมอ่าน ซึ่งก็คือ ReadableStreamDefaultReader เมธอด getReader() ของอินเทอร์เฟซ ReadableStream จะสร้างเครื่องอ่านและล็อกสตรีมไว้ ขณะที่สตรีมล็อกอยู่ คุณจะไม่สามารถรับผู้อ่านรายอื่นได้จนกว่าจะเปิดตัวเครื่องนี้

เมธอด read() ของอินเทอร์เฟซ ReadableStreamDefaultReader จะแสดงสัญญาที่ให้สิทธิ์เข้าถึงส่วนถัดไปในคิวภายในของสตรีม โดยจะตอบสนองหรือปฏิเสธด้วยผลลัพธ์โดยขึ้นอยู่กับสถานะของสตรีม ความเป็นไปได้ต่างๆ มีดังนี้

  • หากมีชิ้นงาน 1 ท่อน ระบบจะดำเนินการตามคําสัญญาด้วยออบเจ็กต์ในแบบฟอร์ม
    { value: chunk, done: false }
  • หากสตรีมจบลง ระบบจะดำเนินการตามสัญญาด้วยออบเจ็กต์ในแบบฟอร์ม
    { value: undefined, done: true }
  • หากสตรีมเกิดข้อผิดพลาด ระบบจะปฏิเสธสัญญาเนื่องจากข้อผิดพลาดที่เกี่ยวข้อง
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

พร็อพเพอร์ตี้ locked

คุณตรวจสอบได้ว่าสตรีมที่อ่านได้ล็อกอยู่หรือไม่โดยเข้าถึงพร็อพเพอร์ตี้ ReadableStream.locked ของสตรีมนั้น

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างสตรีมโค้ดที่อ่านได้

ตัวอย่างโค้ดด้านล่างแสดงขั้นตอนทั้งหมดในการใช้งาน ก่อนอื่น คุณสร้าง ReadableStream ที่อาร์กิวเมนต์ underlyingSource (ซึ่งก็คือคลาส TimestampSource) กำหนดเมธอด start() เมธอดนี้จะแจ้งการประทับเวลาของ controller ถึง enqueue() ทุกๆ วินาทีในช่วง 10 วินาที สุดท้ายจะบอกให้ตัวควบคุมclose()สตรีม คุณใช้สตรีมนี้ได้โดยการสร้างผู้อ่านผ่านเมธอด getReader() และเรียกใช้ read() จนกว่าสตรีมจะเป็นเวลา done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

การทำซ้ำแบบไม่พร้อมกัน

การตรวจสอบการวนซ้ำ read() แต่ละครั้งหากสตรีมเป็น done อาจไม่ใช่ API ที่สะดวกที่สุด โชคดีที่จะมีวิธีที่ดีกว่าในกระบวนการนี้ในเร็วๆ นี้ นั่นคือ การทำงานซ้ำไม่พร้อมกัน

for await (const chunk of stream) {
  console.log(chunk);
}

วิธีแก้ปัญหาชั่วคราวในการใช้การทำซ้ำแบบไม่พร้อมกันในวันนี้คือการใช้ลักษณะการทำงานกับ Polyfill

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

การเลือกสตรีมให้อ่านได้สบาย

เมธอด tee() ของอินเทอร์เฟซ ReadableStream จะใช้สตรีมที่อ่านได้ในปัจจุบัน โดยแสดงผลอาร์เรย์ 2 องค์ประกอบที่มีสาขาที่ได้ 2 สาขาเป็นอินสแตนซ์ ReadableStream ใหม่ วิธีนี้ช่วยให้ผู้อ่าน 2 คนอ่านสตรีมพร้อมกันได้ เช่น คุณอาจทำแบบนี้ใน Service Worker หากต้องการดึงข้อมูลการตอบกลับจากเซิร์ฟเวอร์และสตรีมไปยังเบราว์เซอร์ แต่จะสตรีมไปยังแคชของโปรแกรมทำงานของบริการด้วย เนื่องจากระบบใช้เนื้อหาการตอบกลับมากกว่า 1 ครั้งไม่ได้ คุณจึงต้องใช้สำเนา 2 ชุด หากต้องการยกเลิกสตรีม คุณจะต้องยกเลิกทั้ง 2 สาขาที่ได้ โดยทั่วไปแล้ว การเริ่มสตรีมจะล็อกสตรีมไว้เป็นระยะเวลานานเพื่อป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีม

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

สตรีมไบต์ที่อ่านได้

สำหรับสตรีมที่มีค่าไบต์ จะมีการนำเสนอเวอร์ชันขยายของสตรีมที่อ่านได้เพื่อจัดการไบต์อย่างมีประสิทธิภาพ โดยเฉพาะการลดขนาดสำเนา สตรีมไบต์ทำให้ได้ผู้อ่านที่ใช้บัฟเฟอร์ด้วยตนเอง (BYOB) ได้ การใช้งานเริ่มต้นจะให้เอาต์พุตที่หลากหลาย เช่น สตริงหรือบัฟเฟอร์อาร์เรย์ในกรณีของ WebSockets ในขณะที่สตรีมแบบไบต์รับประกันจำนวนเอาต์พุตไบต์ นอกจากนี้ เครื่องอ่าน BYOB ยังมีข้อดีในด้านความเสถียร เพราะหากบัฟเฟอร์หลุดออกจากกัน รับประกันได้ว่าจะไม่มีการเขียนลงในบัฟเฟอร์เดียวกัน 2 ครั้ง ซึ่งจะหลีกเลี่ยงสภาพการแข่งขัน เครื่องอ่าน BYOB สามารถลดจำนวนครั้งที่เบราว์เซอร์ต้องเรียกใช้การรวบรวมขยะ เนื่องจากโปรแกรมจะนำบัฟเฟอร์มาใช้ซ้ำได้

การสร้างสตรีมไบต์ที่อ่านได้

คุณสร้างสตรีมไบต์ที่อ่านได้โดยการส่งพารามิเตอร์ type เพิ่มเติมไปยังเครื่องมือสร้าง ReadableStream()

new ReadableStream({ type: 'bytes' });

underlyingSource

แหล่งที่มาเบื้องหลังของสตรีมไบต์ที่อ่านได้จะได้รับ ReadableByteStreamController เพื่อจัดการ เมธอด ReadableByteStreamController.enqueue() ของจะใช้อาร์กิวเมนต์ chunk ซึ่งมีค่าเป็น ArrayBufferView พร็อพเพอร์ตี้ ReadableByteStreamController.byobRequest จะแสดงผลคำขอพุล BYOB ปัจจุบัน หรือ Null หากไม่มี สุดท้าย พร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize จะแสดงผลขนาดที่ต้องการเพื่อเติมคิวภายในของสตรีมที่มีการควบคุม

queuingStrategy

อาร์กิวเมนต์ที่ 2 ในทํานองเดียวกัน อาร์กิวเมนต์ของตัวสร้าง ReadableStream() คือ queuingStrategy นี่เป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสำหรับสตรีม (ไม่บังคับ) ซึ่งจะใช้ 1 พารามิเตอร์ดังนี้

  • highWaterMark: จำนวนไบต์ที่ไม่ติดลบที่ระบุค่าน้ำที่สูงของสตรีมโดยใช้กลยุทธ์การจัดคิวนี้ ค่านี้ใช้เพื่อระบุความกดดันของระบบ โดยแสดงผ่านพร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize ที่เหมาะสม นอกจากนี้ ยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาที่สำคัญด้วย

เมธอด getReader() และ read()

จากนั้นคุณจะมีสิทธิ์เข้าถึง ReadableStreamBYOBReader ได้โดยตั้งค่าพารามิเตอร์ mode ให้สอดคล้องกันดังนี้ ReadableStream.getReader({ mode: "byob" }) วิธีนี้จะช่วยให้ควบคุมการจัดสรรบัฟเฟอร์ได้ละเอียดยิ่งขึ้นเพื่อหลีกเลี่ยงสำเนา หากต้องการอ่านจากสตรีมไบต์ คุณต้องเรียกใช้ ReadableStreamBYOBReader.read(view) โดยที่ view เป็น ArrayBufferView

ตัวอย่างโค้ดสตรีมไบต์ที่อ่านได้

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

ฟังก์ชันต่อไปนี้จะแสดงผลสตรีมไบต์ที่อ่านได้ ซึ่งช่วยให้อ่านอาร์เรย์ที่สร้างขึ้นแบบสุ่มได้อย่างมีประสิทธิภาพ แทนที่จะใช้ขนาด 1,024 ที่กำหนดไว้ล่วงหน้า อุปกรณ์จะพยายามเติมบัฟเฟอร์ที่นักพัฒนาซอฟต์แวร์จัดหาให้ ทำให้ควบคุมได้อย่างเต็มที่

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

กลไกของสตรีมที่เขียนได้

สตรีมที่เขียนได้คือปลายทางที่คุณเขียนข้อมูลซึ่งแสดงอยู่ใน JavaScript โดยออบเจ็กต์ WritableStream ได้ วิธีนี้ทำหน้าที่เป็นนามธรรมเหนือซิงก์รอง ซึ่งเป็นซิงก์ I/O ระดับล่างที่มีการเขียนข้อมูลดิบ

ระบบจะเขียนข้อมูลไปยังสตรีมผ่านผู้เขียน โดยเขียนครั้งละ 1 ส่วน ชิ้นส่วนที่มี 1 ชิ้นสามารถอยู่ในรูปแบบได้หลากหลาย เหมือนข้อความชิ้นส่วนที่อยู่ในเครื่องอ่าน คุณสามารถใช้โค้ดใดก็ได้ที่ต้องการเพื่อสร้างส่วนที่พร้อมเขียน ผู้เขียนบวกโค้ดที่เกี่ยวข้องเรียกว่า Producer

เมื่อมีการสร้างผู้เขียนและเริ่มเขียนสตรีม (ผู้เขียนที่ใช้งานอยู่) ระบบจะถือว่าผู้เขียนล็อกไว้ จะมีเพียงนักเขียน 1 คนเท่านั้นที่สามารถเขียนในสตรีมที่เขียนได้ในเวลาเดียวกัน หากต้องการให้นักเขียนอีกคนเริ่มเขียนสตรีมของคุณ โดยปกติคุณจะต้องปล่อยสตรีมดังกล่าวก่อนที่จะแนบผู้เขียนคนอื่นกับสตรีม

คิวภายในจะติดตามส่วนที่ถูกเขียนลงในสตรีมแต่ยังไม่ได้รับการประมวลผลโดยซิงก์ที่สำคัญ

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กำหนดวิธีที่สตรีมควรส่งสัญญาณ Backpress ตามสถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละส่วน และเปรียบเทียบขนาดของกลุ่มทั้งหมดในคิวกับจำนวนที่ระบุ ซึ่งเรียกว่าเครื่องหมายปริมาณน้ำสูง

โครงสร้างขั้นสุดท้ายเรียกว่าตัวควบคุม สตรีมที่เขียนได้แต่ละรายการจะมีตัวควบคุมที่เชื่อมโยงอยู่ ซึ่งช่วยให้คุณควบคุมสตรีมได้ (เช่น ล้มเลิกสตรีม)

การสร้างสตรีมที่เขียนได้

อินเทอร์เฟซ WritableStream ของ Streams API มี Abstraction มาตรฐานสำหรับการเขียนข้อมูลสตรีมมิงไปยังปลายทางที่เรียกว่าซิงก์ ออบเจ็กต์นี้มาพร้อมระบบดันกลับและการจัดคิวในตัว คุณสร้างสตรีมที่เขียนได้โดยเรียกใช้ตัวสร้างของสตรีม WritableStream() โดยมีพารามิเตอร์ underlyingSink ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ที่มีเมธอดและพร็อพเพอร์ตี้ซึ่งกำหนดลักษณะการทำงานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSink

underlyingSink อาจมีวิธีการที่นักพัฒนาแอปกำหนด (ไม่บังคับ) ดังต่อไปนี้ พารามิเตอร์ controller ที่ส่งไปยังเมธอดบางรายการคือ WritableStreamDefaultController

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างวัตถุ เนื้อหาของวิธีการนี้ควรมีเป้าหมายเพื่อเข้าถึงซิงก์ที่เกี่ยวข้อง หากการประมวลผลนี้ไม่พร้อมกัน อาจทำให้ระบบแสดงสัญญาที่ส่งสัญญาณว่าประสบความสำเร็จหรือล้มเหลว
  • write(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อมีกลุ่มข้อมูลใหม่ (ที่ระบุในพารามิเตอร์ chunk) พร้อมที่จะเขียนไปยังซิงก์ที่สำคัญ โดยสามารถส่งกลับคำสัญญาเพื่อบ่งบอกถึงความสำเร็จหรือความล้มเหลวของการเขียน ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนก่อนหน้านี้เสร็จสมบูรณ์แล้วเท่านั้น และจะไม่หลังจากที่สตรีมปิดหรือล้มเลิก
  • close(controller): ระบบจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าเขียนส่วนเนื้อหาลงในสตรีมเสร็จแล้ว เนื้อหาควรทำทุกวิถีทางที่จำเป็นเพื่อสรุปการเขียนในซิงก์ที่มีและปล่อยสิทธิ์เข้าถึงเนื้อหานั้น หากกระบวนการนี้ไม่พร้อมกัน อาจทำให้เกิดสัญญาณว่าสำเร็จหรือล้มเหลว ระบบจะเรียกใช้เมธอดนี้หลังจากที่การเขียนในคิวทั้งหมดเสร็จสมบูรณ์แล้วเท่านั้น
  • abort(reason): เราจะเรียกใช้เมธอดนี้หากแอปส่งสัญญาณว่าต้องการปิดสตรีมอย่างกะทันหันและทำให้สตรีมอยู่ในสถานะข้อผิดพลาด โดยสามารถล้างทรัพยากรที่ถูกระงับได้ เช่นเดียวกับ close() แต่จะมีการเรียกใช้ abort() แม้ว่าการเขียนจะอยู่ในคิวก็ตาม ส่วนนั้นก็จะหายไป หากกระบวนการนี้ไม่พร้อมกัน อาจทำให้ระบบแสดงสัญญาที่บ่งบอกถึงความสำเร็จหรือความล้มเหลวได้ พารามิเตอร์ reason มี DOMString ที่อธิบายเหตุผลในการล้มเลิกสตรีม
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

อินเทอร์เฟซ WritableStreamDefaultController ของ Streams API เป็นตัวควบคุมที่ช่วยให้ควบคุมสถานะของ WritableStream ได้ในระหว่างการตั้งค่า เมื่อมีการส่งส่วนเพิ่มเติมเข้ามาเขียนหรือเมื่อเขียนเสร็จแล้ว เมื่อสร้าง WritableStream ซิงก์ที่ใช้อยู่เบื้องหลังจะได้รับอินสแตนซ์ WritableStreamDefaultController ที่สอดคล้องกันเพื่อจัดการ WritableStreamDefaultController มีเพียงวิธีเดียวคือ WritableStreamDefaultController.error() ซึ่งทำให้การโต้ตอบในอนาคตกับสตรีมที่เกี่ยวข้องเกิดข้อผิดพลาด WritableStreamDefaultController ยังรองรับพร็อพเพอร์ตี้ signal ซึ่งแสดงผลอินสแตนซ์ของ AbortSignal ด้วย ซึ่งทำให้การดำเนินการ WritableStream หยุดลงได้หากจำเป็น

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่ 2 ในทํานองเดียวกัน อาร์กิวเมนต์ของตัวสร้าง WritableStream() คือ queuingStrategy ซึ่งเป็นออบเจ็กต์ที่กำหนดกลยุทธ์การจัดคิวสำหรับสตรีม (ไม่บังคับ) ซึ่งจะใช้พารามิเตอร์ 2 ตัวดังนี้

  • highWaterMark: ตัวเลขที่ไม่ติดลบซึ่งระบุค่าน้ำที่สูงของลำธารโดยใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คํานวณและแสดงผลขนาดที่ไม่ใช่ลบจํานวนจํากัดของค่ากลุ่มที่ระบุ ผลลัพธ์จะใช้เพื่อระบุความกดดันย้อนกลับ โดยแสดงผ่านพร็อพเพอร์ตี้ WritableStreamDefaultWriter.desiredSize ที่เหมาะสม

เมธอด getWriter() และ write()

หากต้องการเขียนไปยังสตรีมที่เขียนได้ คุณต้องมีผู้เขียน ซึ่งจะเป็น WritableStreamDefaultWriter เมธอด getWriter() ของอินเทอร์เฟซ WritableStream จะส่งคืนอินสแตนซ์ใหม่ของ WritableStreamDefaultWriter และล็อกสตรีมไปยังอินสแตนซ์ดังกล่าว แม้ว่าสตรีมจะล็อกอยู่ คุณก็จะหาผู้เขียนคนอื่นไม่ได้จนกว่าผู้เขียนปัจจุบันจะเปิดตัว

เมธอด write() ของอินเทอร์เฟซ WritableStreamDefaultWriter จะเขียนข้อมูลที่ส่งผ่านไปยัง WritableStream และซิงก์ที่สำคัญ จากนั้นจะส่งกลับคำสัญญาที่บ่งบอกถึงความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน โปรดทราบว่า "ความสำเร็จ" มีความหมายว่าอะไรก็ขึ้นอยู่กับซิงก์ที่เกี่ยวข้อง แต่อาจบ่งบอกว่ากลุ่มได้รับการยอมรับแล้ว และไม่จำเป็นต้องบันทึกไว้ในปลายทางสุดท้ายอย่างปลอดภัย

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

พร็อพเพอร์ตี้ locked

คุณตรวจสอบได้ว่าสตรีมที่เขียนได้ถูกล็อกอยู่หรือไม่โดยเข้าถึงพร็อพเพอร์ตี้ WritableStream.locked ของสตรีมนั้น

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมที่เขียนได้

ตัวอย่างโค้ดด้านล่างแสดงขั้นตอนทั้งหมดในการใช้งาน

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

การเชื่อมต่อสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้

ระบบจะเชื่อมต่อสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้ผ่านทางเมธอด pipeTo() ของสตรีมที่อ่านได้ ReadableStream.pipeTo() จะไปป์ไลน์ ReadableStream ปัจจุบันไปยัง WritableStream ที่ระบุ และส่งคืนสัญญาที่ดำเนินการตามคำสั่งซื้อเมื่อกระบวนการไปป์ไลน์เสร็จสมบูรณ์ หรือปฏิเสธเมื่อพบข้อผิดพลาด

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

การสร้างสตรีมการเปลี่ยนรูปแบบ

อินเทอร์เฟซ TransformStream ของ Streams API แสดงชุดข้อมูลที่เปลี่ยนรูปแบบได้ คุณสร้างสตรีมการเปลี่ยนรูปแบบได้โดยเรียกใช้ตัวสร้างของ TransformStream() ซึ่งจะสร้างและแสดงผลออบเจ็กต์สตรีมการเปลี่ยนรูปแบบจากตัวแฮนเดิลที่ระบุ ตัวสร้าง TransformStream() จะยอมรับออบเจ็กต์ JavaScript ที่ไม่บังคับแทน transformer เป็นอาร์กิวเมนต์แรก วัตถุดังกล่าวอาจมีวิธีการอย่างใดอย่างหนึ่งต่อไปนี้

transformer

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างวัตถุ โดยปกติแล้วจะใช้เพื่อจัดคิวส่วนคำนำหน้าโดยใช้ controller.enqueue() ชิ้นส่วนเหล่านั้นจะได้รับการอ่านจากด้านที่อ่านได้ แต่ไม่ได้ขึ้นอยู่กับการเขียนในด้านที่เขียนได้ เช่น หากกระบวนการระยะแรกไม่พร้อมกัน เช่น เนื่องจากต้องใช้ความพยายามอย่างมากในการรับส่วนคำนำหน้า ฟังก์ชันดังกล่าวอาจแสดงผลคำสัญญาเป็นสัญญาณว่าประสบความสำเร็จหรือล้มเหลว คำสัญญาที่ถูกปฏิเสธจะทำให้สตรีมผิดพลาด ตัวสร้าง TransformStream() จะแสดงผลข้อยกเว้นทั้งหมดอีกครั้ง
  • transform(chunk, controller): ระบบจะเรียกใช้เมธอดนี้เมื่อส่วนใหม่ที่เขียนไปยังด้านที่เขียนได้แต่เดิมพร้อมที่จะเปลี่ยนรูปแบบ การใช้สตรีมจะรับประกันว่าฟังก์ชันนี้จะมีการเรียกใช้หลังจากการเปลี่ยนรูปแบบก่อนหน้านี้สําเร็จแล้วเท่านั้น และไม่เคยมีการเรียกใช้ start() จนเสร็จสมบูรณ์หรือหลังจาก flush() มีการเรียกใช้ ฟังก์ชันนี้จะดำเนินการเปลี่ยนรูปแบบ ของสตรีมการเปลี่ยนรูปแบบจริง จัดลำดับผลการค้นหาได้โดยใช้ controller.enqueue() การดำเนินการนี้จะทำให้มีประโยค 1 ท่อนที่เขียนไว้ในด้านที่เขียนได้ เพื่อแสดงเป็น 0 หรือหลายท่อนในฝั่งที่อ่านได้ โดยขึ้นอยู่กับจำนวนครั้งที่เรียกใช้ controller.enqueue() หากกระบวนการเปลี่ยนรูปแบบเป็นแบบไม่พร้อมกัน ฟังก์ชันนี้จะแสดงสัญญาที่บ่งบอกถึงความสำเร็จหรือความล้มเหลวของการเปลี่ยนรูปแบบ สัญญาการปฏิเสธที่จะเกิดข้อผิดพลาดทั้งด้านที่อ่านได้และเขียนได้ของสตรีมการเปลี่ยนรูปแบบ หากไม่ได้ระบุเมธอด transform() ระบบจะใช้การเปลี่ยนรูปแบบข้อมูลประจำตัว ซึ่งจะจัดคิวข้อมูลไม่เปลี่ยนแปลงจากด้านที่เขียนได้ไปยังด้านที่อ่านได้
  • flush(controller): ระบบจะเรียกเมธอดนี้หลังจากกลุ่มทั้งหมดที่เขียนไปยังด้านที่เขียนได้ได้รับการเปลี่ยนรูปแบบเมื่อส่งผ่าน transform() สำเร็จ และด้านที่เขียนได้กำลังจะปิด โดยปกติจะใช้การจัดลำดับส่วนต่อท้ายไปยังด้านที่อ่านได้ ก่อนที่ส่วนนั้นจะปิดลง หากกระบวนการล้างไม่พร้อมกัน ฟังก์ชันอาจแสดงสัญญาว่าสำเร็จหรือล้มเหลว ระบบจะส่งผลลัพธ์ไปยังผู้โทรของ stream.writable.write() นอกจากนี้ คำสัญญาที่ถูกปฏิเสธจะเกิดข้อผิดพลาดทั้งด้านที่อ่านได้และที่เขียนได้ของสตรีม การส่งข้อยกเว้นจะได้รับการปฏิบัติเหมือนกับการส่งคืนคำสัญญาที่ถูกปฏิเสธ
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

กลยุทธ์การจัดคิว writableStrategy และ readableStrategy

พารามิเตอร์ที่ไม่บังคับรายการที่ 2 และ 3 ของตัวสร้าง TransformStream() เป็นกลยุทธ์การจัดคิว writableStrategy และ readableStrategy ที่ไม่บังคับ ซึ่งจะได้รับการระบุไว้ในส่วนสตรีมที่อ่านได้และที่เขียนได้ตามลำดับ

ตัวอย่างโค้ดสตรีมการเปลี่ยนรูปแบบ

ตัวอย่างโค้ดต่อไปนี้แสดงสตรีมการเปลี่ยนรูปแบบอย่างง่ายในการทำงานจริง

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

การสร้างสตรีมที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบ

เมธอด pipeThrough() ของอินเทอร์เฟซ ReadableStream เป็นวิธีที่เชื่อมโยงสตรีมปัจจุบันผ่านสตรีมการเปลี่ยนรูปแบบหรือคู่อื่นๆ ที่เขียนได้/อ่านได้ โดยทั่วไป การเชื่อมต่อท่อสตรีมจะล็อกสตรีมไว้ตามระยะเวลาของไปป์ไลน์ เพื่อป้องกันไม่ให้ผู้อ่านรายอื่นล็อกสตรีม

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

ตัวอย่างโค้ดถัดไป (ต่อแล้วเล็กน้อย) จะแสดงวิธีใช้ fetch() เวอร์ชัน "การตะโกน" ที่เพิ่มข้อความทั้งหมดเป็นตัวพิมพ์ใหญ่โดยนำการตอบกลับที่ตอบกลับมาไปใช้เป็นสตรีม และใช้ตัวพิมพ์ใหญ่โดยเรียงเป็นตัวพิมพ์ใหญ่ ข้อดีของวิธีนี้คือคุณไม่ต้องรอให้ดาวน์โหลดเอกสารทั้งฉบับ ซึ่งจะสร้างความแตกต่างได้อย่างมากเมื่อต้องจัดการกับไฟล์ขนาดใหญ่

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

ข้อมูลประชากร

การสาธิตด้านล่างแสดงให้เห็นการทำงานจริงของสตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบ นอกจากนี้ยังรวมถึงตัวอย่างของเชนไปป์ pipeThrough() และ pipeTo() รวมถึง tee() ด้วย โดยคุณจะเรียกใช้การสาธิตในหน้าต่างแยกต่างหากหรือดูซอร์สโค้ดก็ได้

สตรีมที่มีประโยชน์ที่พร้อมใช้งานในเบราว์เซอร์

มีสตรีมที่เป็นประโยชน์มากมายติดตั้งมาในเบราว์เซอร์แล้ว คุณสร้าง ReadableStream จาก Blob ได้ง่ายๆ เมธอด stream() ของอินเทอร์เฟซ Blob จะแสดงผล ReadableStream ซึ่งเมื่ออ่านแล้วจะแสดงผลข้อมูลที่อยู่ใน Blob โปรดทราบด้วยว่าออบเจ็กต์ File เป็น Blob ประเภทหนึ่ง และใช้ในบริบทใดก็ได้ที่ Blob ทำได้

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

ตัวแปรสตรีมมิงของ TextDecoder.decode() และ TextEncoder.encode() เรียกว่า TextDecoderStream และ TextEncoderStream ตามลำดับ

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

การบีบอัดหรือการขยายไฟล์ทำได้ง่ายๆ ด้วยการเปลี่ยนรูปแบบสตรีมของ CompressionStream และ DecompressionStream ตามลำดับ ตัวอย่างโค้ดด้านล่างแสดงวิธีดาวน์โหลดข้อกำหนดของสตรีม บีบอัด (gzip) ในเบราว์เซอร์โดยตรง และเขียนไฟล์ที่บีบอัดลงในดิสก์โดยตรง

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

ของ File System Access API FileSystemWritableFileStream และfetch()สตรีมคำขอรุ่นทดลองคือตัวอย่างของสตรีมที่เขียนได้โดยทั่วไป

Serial API ใช้ทั้งสตรีมที่อ่านได้และเขียนได้เป็นจำนวนมาก

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

สุดท้าย WebSocketStream API จะผสานรวมสตรีมกับ WebSocket API

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

แหล่งข้อมูลที่มีประโยชน์

ข้อความแสดงการยอมรับ

บทความนี้ได้รับการตรวจสอบโดย Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley และ Adam Rice บล็อกโพสต์ของ Jake Archibald ช่วยให้ผมเข้าใจสตรีมได้มาก ตัวอย่างโค้ดบางส่วนได้รับแรงบันดาลใจจากการสำรวจของผู้ใช้ GitHub @bellbind และบทร้อยแก้วที่สร้างขึ้นอย่างหนักใน MDN Web Docs ในสตรีม นักเขียน Streams Standard ได้เขียนข้อกำหนดนี้ ได้อย่างยอดเยี่ยม โดยรูปภาพหลักของ Ryan Lara ใน Unsplash