สตรีม—คู่มือฉบับสมบูรณ์

ดูวิธีใช้สตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบด้วย Streams API

Streams API ช่วยให้คุณเข้าถึงสตรีมข้อมูลที่ได้รับผ่านเครือข่ายแบบเป็นโปรแกรมได้ หรือสร้างด้วยวิธีการใดๆ ก็ตามภายในท้องถิ่นและ ประมวลผลด้วย JavaScript สตรีมมิงเกี่ยวข้องกับการแบ่งทรัพยากรที่คุณต้องการรับ ส่ง หรือเปลี่ยนรูปแบบ มาแบ่งเป็นท่อนสั้นๆ แล้วค่อยๆ ประมวลผลทีละน้อย ขณะกำลังสตรีม เบราว์เซอร์ก็ยังคงได้รับเนื้อหาอย่าง HTML หรือวิดีโอที่จะแสดงบนหน้าเว็บ ไม่เคยใช้กับ JavaScript มาก่อน fetch ที่มีการเปิดตัวสตรีมในปี 2015

ก่อนหน้านี้ในกรณีที่คุณต้องการประมวลผลทรัพยากรบางประเภท (ไม่ว่าจะเป็นวิดีโอหรือไฟล์ข้อความ เป็นต้น) คุณต้องดาวน์โหลดทั้งไฟล์ รอให้ระบบดีซีเรียลไลซ์เป็นรูปแบบที่เหมาะสม แล้วประมวลผล เมื่อมีสตรีมพร้อมใช้งานสำหรับ JavaScript การเปลี่ยนแปลงทั้งหมดนี้ ตอนนี้คุณสามารถประมวลผลข้อมูลดิบด้วย JavaScript ค่อยๆ ทยอยเป็นลำดับ ทันทีที่พร้อมใช้งานในไคลเอ็นต์ โดยไม่ต้องสร้างบัฟเฟอร์, สตริง หรือ BLOB วิธีนี้ช่วยปลดล็อก Use Case ต่างๆ ได้บางส่วน ซึ่งเราแสดงไว้ที่ด้านล่าง

  • เอฟเฟกต์วิดีโอ: การเชื่อมต่อสตรีมวิดีโอที่อ่านได้ผ่านสตรีมการเปลี่ยนรูปแบบที่นำเอฟเฟกต์ไปใช้ แบบเรียลไทม์
  • การบีบอัดข้อมูล (de): การเพียร์สตรีมไฟล์ผ่านสตรีมการแปลงที่เลือก จะบีบอัดไฟล์
  • การถอดรหัสรูปภาพ: การเชื่อมต่อสตรีมการตอบสนอง HTTP ผ่านสตรีมการแปลงที่ถอดรหัสไบต์ เป็นข้อมูลบิตแมป แล้วส่งผ่านสตรีมการเปลี่ยนรูปแบบอีกสตรีมหนึ่งที่แปลบิตแมปเป็น PNG ถ้า ที่ติดตั้งไว้ภายในเครื่องจัดการ fetch ของ Service Worker ซึ่งจะช่วยให้คุณมองเห็น Polyfill แบบโปร่งใส รูปแบบรูปภาพใหม่อย่าง AVIF

การสนับสนุนเบราว์เซอร์

ReadableStream และ WritableStream

การรองรับเบราว์เซอร์

  • Chrome: 43
  • ขอบ: 14
  • Firefox: 65
  • Safari: 10.1

แหล่งที่มา

TransformStream

การรองรับเบราว์เซอร์

  • Chrome: 67
  • ขอบ: 79
  • Firefox: 102
  • Safari: 14.1

แหล่งที่มา

แนวคิดหลัก

ก่อนที่จะลงรายละเอียดเกี่ยวกับสตรีมประเภทต่างๆ ผมขอแนะนำแนวคิดหลักบางประการ

เป็นก้อน

กลุ่มคือข้อมูลชิ้นเดียวที่เขียนไปยังหรืออ่านจากสตรีม อาจเป็นรายการใดก็ได้ type; สตรีมอาจประกอบด้วยเนื้อหา ประเภทที่แตกต่างกัน ส่วนใหญ่แล้ว สักท่อนจะไม่ใช่อะตอมที่สุด หน่วยของข้อมูลสำหรับสตรีมที่ระบุ ตัวอย่างเช่น สตรีมแบบไบต์อาจมีกลุ่มที่ประกอบด้วย 16 KiB Uint8Array หน่วย แทนที่จะเป็นไบต์เดี่ยว

สตรีมที่อ่านได้

สตรีมที่อ่านได้แสดงถึงแหล่งที่มาของข้อมูลซึ่งคุณสามารถอ่านได้ กล่าวอีกนัยหนึ่งคือ ข้อมูลได้มาจาก ออกจากสตรีมที่อ่านได้ กล่าวอย่างชัดเจนคือ สตรีมที่อ่านได้เป็นอินสแตนซ์ของ ReadableStream

สตรีมที่เขียนได้

สตรีมที่เขียนได้แสดงถึงปลายทางของข้อมูลที่คุณสามารถเขียนได้ กล่าวคือ ข้อมูล เข้าไปยังสตรีมที่เขียนได้ อย่างที่เป็นรูปธรรม สตรีมที่เขียนได้เป็นตัวอย่าง WritableStream ชั้นเรียน

เปลี่ยนรูปแบบสตรีม

สตรีมการเปลี่ยนรูปแบบประกอบด้วยคู่สตรีม ซึ่งเป็นสตรีมที่เขียนได้ หรือที่เรียกว่าฝั่งที่เขียนได้ และสตรีมที่อ่านได้ หรือที่เรียกว่าด้านที่อ่านได้ ส่วนในความเป็นจริง คำอุปมาอุปไมยในลักษณะนี้ ล่ามพร้อมกัน ที่แปลจากภาษาหนึ่งเป็นภาษาอื่นได้ทันที ในลักษณะที่เฉพาะเจาะจงสำหรับสตรีมการเปลี่ยนรูปแบบ การเขียน ไว้ด้านที่เขียนได้ ทำให้สามารถ อ่าน ข้อมูลใหม่ได้จาก ด้านที่อ่านได้ ตามจริง ออบเจ็กต์ใดก็ตามที่มีพร็อพเพอร์ตี้ writable และพร็อพเพอร์ตี้ readable จะแสดงได้ เป็นสตรีมการเปลี่ยนรูปแบบ แต่คลาส TransformStream มาตรฐานจะช่วยให้คุณสร้างได้ง่ายขึ้น คู่ที่เชื่อมต่อกันอย่างถูกต้อง

โซ่ท่อ

เราใช้สตรีมโดยการเชื่อมต่อสตรีมเข้าด้วยกันเป็นหลัก คุณสามารถเชื่อมสตรีมที่อ่านได้โดยตรง ไปยังสตรีมที่เขียนได้ โดยใช้เมธอด pipeTo() ของสตรีมที่อ่านได้ หรือจะส่งผ่านรายการเดียวก็ได้ สตรีมการเปลี่ยนรูปแบบอย่างน้อย 1 รายการก่อน โดยใช้เมธอด pipeThrough() ของสตรีมที่อ่านได้ ชุดของ สตรีมที่มีท่อเข้าด้วยกันในลักษณะนี้เรียกว่า "ห่วงโซ่ไปป์"

แรงกดดันด้านหลัง

เมื่อสร้างโซ่ท่อแล้ว ท่อส่งสัญญาณจะส่งสัญญาณเกี่ยวกับความรวดเร็วในการไหลของท่อ ให้ผ่าน หากขั้นตอนใดๆ ในเชนยังยอมรับกลุ่มไม่ได้ ระบบจะส่งต่อสัญญาณย้อนกลับ ผ่านห่วงโซ่ไปป์ จนกระทั่งในที่สุดได้มีการแจ้งให้แหล่งข้อมูลดั้งเดิมหยุดผลิตท่อน อย่างรวดเร็ว กระบวนการในการปรับโฟลว์มาตรฐานนี้เรียกว่า Backpressure

เสื้อยืด

คุณสามารถใส่สตรีมที่อ่านได้ (ตั้งชื่อตามรูปร่างตัว "T") ตัวพิมพ์ใหญ่โดยใช้เมธอด tee() ซึ่งจะเป็นการล็อกสตรีม กล่าวคือ ทำให้สตรีมใช้โดยตรงไม่ได้อีกต่อไป แต่จะสร้างแท็ก สตรีม ที่เรียกว่า Branch ซึ่งสามารถใช้งานได้แบบอิสระ การตีลูกก็มีความสำคัญเช่นกันเพราะจะไม่สามารถรีรอหรือรีสตาร์ทสตรีมได้ รายละเอียดเพิ่มเติมเกี่ยวกับเรื่องนี้ในภายหลัง

วันที่ แผนภาพของห่วงโซ่ไปป์ที่ประกอบด้วยสตรีมที่อ่านได้ซึ่งมาจากการเรียก API การดึงข้อมูล ซึ่งต่อจากนั้นผ่านสตรีมการแปลงซึ่งมีเอาต์พุตต่อจากนั้น และส่งเอาต์พุตไปยังเบราว์เซอร์สำหรับสตรีมที่อ่านได้ผลลัพธ์แรก และไปยังแคชของ Service Worker สำหรับสตรีมที่อ่านได้ผลลัพธ์ที่สอง
โซ่ไปป์

กลไกของสตรีมที่อ่านได้

สตรีมที่อ่านได้คือแหล่งข้อมูลที่แสดงใน JavaScript โดย ReadableStream ที่ มาจากแหล่งที่มาที่สำคัญ ReadableStream() เครื่องมือสร้างสร้างและแสดงผลออบเจ็กต์สตรีมที่อ่านได้จากตัวแฮนเดิลที่ระบุ มี 2 แบบ ของแหล่งที่มาที่สำคัญ

  • แหล่งที่มาของข้อความ Push จะพุชข้อมูลให้คุณอย่างต่อเนื่องเมื่อคุณเข้าถึงแหล่งที่มาแล้ว ทั้งนี้ขึ้นอยู่กับคุณ เริ่ม หยุดชั่วคราว หรือยกเลิกการเข้าถึงสตรีม ตัวอย่างเช่น สตรีมวิดีโอสด เหตุการณ์ที่เซิร์ฟเวอร์ส่ง หรือ WebSockets
  • แหล่งข้อมูลสำหรับพุลกำหนดให้คุณต้องขอข้อมูลจากแหล่งดังกล่าวอย่างชัดแจ้งเมื่อเชื่อมต่อแล้ว ตัวอย่าง รวมการดำเนินการ HTTP ผ่านการเรียก fetch() หรือ XMLHttpRequest

ระบบจะอ่านข้อมูลสตรีมตามลำดับเป็นส่วนเล็กๆ ที่เรียกว่ากลุ่ม คนที่วางในสตรีมจะได้รับการจัดคิว ซึ่งหมายความว่ากำลังรอคิว พร้อมให้อ่านแล้ว คิวภายในจะติดตามเนื้อหาส่วนที่ยังไม่ได้อ่าน

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กำหนดวิธีที่สตรีมควรส่งสัญญาณย้อนกลับโดยยึดตาม สถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบตัวเลือก ขนาดรวมของกลุ่มทั้งหมดในคิวตามจำนวนที่ระบุ หรือที่เรียกว่าเครื่องหมายบนน้ำสูง

ผู้อ่านจะอ่านส่วนเนื้อหาในสตรีมในสตรีม โปรแกรมอ่านนี้จะเรียกข้อมูลครั้งละ 1 กลุ่ม ทำให้คุณสามารถทำงานต่างๆ ได้ตามที่คุณต้องการ ผู้อ่านบวกผู้อ่านคนอื่นๆ โค้ดประมวลผลที่ทำงานคู่กับโค้ดนั้นจะเรียกว่าผู้บริโภค

โครงสร้างถัดไปในบริบทนี้จะเรียกว่าตัวควบคุม สตรีมที่อ่านได้แต่ละรายการจะมีการเชื่อมโยง ตัวควบคุม ที่จะช่วยให้คุณสามารถควบคุมสตรีมได้ตามชื่อ

ผู้อ่านจะอ่านสตรีมได้ทีละ 1 คนเท่านั้น เมื่อมีการสร้างผู้อ่านและเริ่มอ่านสตรีม (กล่าวคือกลายเป็นผู้อ่านที่ใช้งานอยู่) เนื้อหาจะล็อกไว้ หากต้องการให้ผู้อ่านคนอื่นรับหน้าที่ อ่านสตรีม โดยปกติแล้วคุณจะต้องเผยแพร่โปรแกรมอ่านคนแรกก่อนดำเนินการใดๆ (แต่คุณสามารถทีสตรีมได้)

การสร้างสตรีมที่อ่านได้

คุณสร้างสตรีมที่อ่านได้โดยเรียกใช้ตัวสร้าง ReadableStream() ตัวสร้างมีอาร์กิวเมนต์ underlyingSource ที่ไม่บังคับ ซึ่งแทนออบเจ็กต์ พร้อมเมธอดและพร็อพเพอร์ตี้ที่กำหนดลักษณะการทำงานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSource

วิธีนี้สามารถใช้วิธีการที่นักพัฒนาแอปกำหนดหรือไม่ก็ได้ต่อไปนี้

  • start(controller): เรียกใช้ทันทีเมื่อมีการสร้างออบเจ็กต์ สามารถเข้าถึงแหล่งที่มาของสตรีมและดำเนินการอื่นๆ ได้ ซึ่งจำเป็นต่อการตั้งค่าฟังก์ชันการทำงานของสตรีม หากกระบวนการนี้เป็นการดำเนินการแบบไม่พร้อมกัน เมธอดสามารถ ให้คำมั่นสัญญาว่าจะส่งสัญญาณสำเร็จหรือล้มเหลว พารามิเตอร์ controller ที่ส่งไปยังเมธอดนี้คือ CANNOT TRANSLATE ReadableStreamDefaultController
  • pull(controller): ใช้ควบคุมสตรีมเมื่อมีการดึงข้อมูลชิ้นส่วนมากขึ้นได้ ทั้งนี้ จะถูกเรียกซ้ำๆ ตราบใดที่คิวภายในของสตรีมยังไม่เต็ม จนกว่าจะถึงคิว ลงไปถึงผิวน้ำที่อยู่สูง หากผลจากการเรียก pull() เป็นสิ่งที่บอกได้ จะไม่มีการโทรหา pull() อีกจนกว่าสัญญาที่กล่าวไว้จะเป็นไปตามข้อกำหนด หากปฏิเสธสัญญา สตรีมจะเกิดข้อผิดพลาด
  • cancel(reason): เรียกใช้เมื่อผู้บริโภคของสตรีมยกเลิกสตรีม
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController รองรับเมธอดต่อไปนี้

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่สองของตัวสร้าง ReadableStream() ที่ไม่บังคับเช่นกันคือ queuingStrategy ออบเจ็กต์นี้กำหนดกลยุทธ์การจัดคิวสำหรับสตรีม โดยใช้ 2 วิธี ได้แก่

  • highWaterMark: จำนวนที่ไม่เป็นลบที่ระบุน้ำสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คํานวณและแสดงผลขนาดที่ไม่เป็นลบจํากัดของค่ากลุ่มที่ระบุ ผลลัพธ์จะใช้เพื่อระบุความดันกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableStreamDefaultController.desiredSize ที่เหมาะสม นอกจากนี้ยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาที่สำคัญด้วย
วันที่
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

เมธอด getReader() และ read()

หากต้องการอ่านจากสตรีมที่อ่านได้ คุณจะต้องมีโปรแกรมอ่าน ซึ่งจะ ReadableStreamDefaultReader เมธอด getReader() ของอินเทอร์เฟซ ReadableStream จะสร้างเครื่องอ่านและล็อกสตรีมเป็น ได้ ขณะที่สตรีมล็อกอยู่ จะรับผู้อ่านคนอื่นๆ ไม่ได้จนกว่าจะเผยแพร่เครื่องอ่านนี้

read() ของอินเทอร์เฟซ ReadableStreamDefaultReader แสดงการสัญญาว่าจะให้การเข้าถึง ในคิวภายในของสตรีม โดยจะบรรลุหรือปฏิเสธผลลัพธ์ตามสถานะของ สตรีม ความเป็นไปได้ต่างๆ มีดังนี้

  • หากมีชิ้นส่วนใดพร้อมใช้งาน ก็จะได้รับการดำเนินการตามคำสัญญาด้วยออบเจ็กต์ที่มีรูปแบบ
    { value: chunk, done: false }
  • หากสตรีมปิดแล้ว ระบบจะดำเนินการสัญญาให้เสร็จสิ้นด้วยออบเจ็กต์ที่มีรูปแบบ
    { value: undefined, done: true }
  • หากสตรีมเกิดข้อผิดพลาด ระบบจะปฏิเสธสัญญาพร้อมข้อผิดพลาดที่เกี่ยวข้อง
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่อ่านได้ล็อกอยู่หรือไม่โดยการเข้าถึง ReadableStream.locked

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างโค้ดสตรีมที่อ่านได้

ตัวอย่างโค้ดด้านล่างแสดงขั้นตอนทั้งหมดในการใช้งาน ก่อนอื่นคุณสร้าง ReadableStream ที่อยู่ใน อาร์กิวเมนต์ underlyingSource (ซึ่งก็คือคลาส TimestampSource) กำหนดเมธอด start() วิธีนี้จะทำให้ controller ของสตรีม enqueue() การประทับเวลาทุกวินาทีในช่วง 10 วินาที สุดท้าย จะบอกให้ตัวควบคุมclose()สตรีม คุณบริโภคสิ่งนี้ สตรีมโดยการสร้างโปรแกรมอ่านผ่านเมธอด getReader() และเรียกใช้ read() จนกว่าสตรีมจะถึง done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

การทำซ้ำแบบอะซิงโครนัส

การตรวจสอบการวนซ้ำของ read() แต่ละรายการว่าสตรีมเป็น done อาจไม่ใช่ API ที่สะดวกที่สุด โชคดีที่อีกไม่นานจะมีวิธีที่ดีกว่านี้ นั่นคือการทำซ้ำแบบอะซิงโครนัส

for await (const chunk of stream) {
  console.log(chunk);
}

วิธีแก้ปัญหาเฉพาะหน้าสำหรับการทำซ้ำแบบไม่พร้อมกันในปัจจุบันคือการใช้ลักษณะการทำงานกับ Polyfill

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

เพิ่มพลังให้กับสตรีมที่อ่านได้

เมธอด tee() ของ อินเทอร์เฟซ ReadableStream แทนสตรีมที่อ่านได้ในปัจจุบัน โดยแสดงผลอาร์เรย์ 2 องค์ประกอบ ซึ่งมี Branch ที่เป็นผลลัพธ์ 2 รายการเป็นอินสแตนซ์ ReadableStream ใหม่ วิธีนี้ช่วยให้ ผู้อ่านสองคนอ่านสตรีมพร้อมกันได้ เช่น คุณอาจดำเนินการเช่นนี้ในโปรแกรมทำงานของบริการ หาก คุณต้องการดึงข้อมูลการตอบกลับจากเซิร์ฟเวอร์และสตรีมไปยังเบราว์เซอร์ แต่ก็สตรีมไปยังเบราว์เซอร์ด้วย แคชของ Service Worker เนื่องจากใช้เนื้อหาการตอบกลับได้ไม่เกิน 1 ครั้ง คุณจึงต้องมีสำเนา 2 สำเนา ได้ หากต้องการยกเลิกสตรีม คุณจะต้องยกเลิกสาขาผลลัพธ์ทั้ง 2 สาขา การวาดลวดลายบนสตรีม โดยทั่วไปแล้วจะล็อกไว้เพื่อป้องกันไม่ให้ผู้อ่านคนอื่นๆ ล็อกไว้

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

ไบต์สตรีมที่อ่านได้

สำหรับสตรีมที่แสดงไบต์ จะมีเวอร์ชันขยายของสตรีมที่อ่านได้เพื่อจัดการ ไบต์ได้อย่างมีประสิทธิภาพ โดยเฉพาะอย่างยิ่งโดยการลดจำนวนสำเนา การสตรีมไบต์ช่วยให้รับบัฟเฟอร์ของคุณเองได้ (BYOB) ที่จะได้รับผู้อ่าน การใช้งานเริ่มต้น จะให้เอาต์พุตที่หลากหลาย เป็นสตริงหรือบัฟเฟอร์อาร์เรย์ในกรณีของ WebSockets ในขณะที่สตรีมแบบไบต์รับประกันเอาต์พุตของไบต์ นอกจากนี้ เครื่องอ่าน BYOB ยังมีข้อดีในเรื่องความเสถียรด้วย นี่คือ เพราะหากบัฟเฟอร์หลุดออกไป ก็รับประกันได้ว่าจะไม่มีการเขียนลงในบัฟเฟอร์เดียวกัน 2 ครั้ง ดังนั้นจึงเลี่ยงสภาวะการแข่งขัน เครื่องอ่าน BYOB สามารถลดจำนวนครั้งที่เบราว์เซอร์ต้องทำงาน ขยะ เพราะสามารถใช้บัฟเฟอร์ซ้ำได้

การสร้างสตรีมไบต์ที่อ่านได้

คุณสามารถสร้างสตรีมไบต์ที่อ่านได้โดยการส่งพารามิเตอร์ type เพิ่มเติมไปยังพารามิเตอร์ เครื่องมือสร้าง ReadableStream()

new ReadableStream({ type: 'bytes' });

underlyingSource

แหล่งที่มาที่สำคัญของสตรีมไบต์ที่อ่านได้มีการกำหนด ReadableByteStreamController ให้กับ จัดการ เมธอด ReadableByteStreamController.enqueue() จะใช้อาร์กิวเมนต์ chunk ที่มีค่า คือArrayBufferView พร็อพเพอร์ตี้ ReadableByteStreamController.byobRequest จะแสดงพร็อพเพอร์ตี้ คำขอพุล BYOB หรือไม่มีข้อมูลหากไม่มี สุดท้ายนี้ ReadableByteStreamController.desiredSize จะแสดงผลขนาดที่ต้องการเพื่อเติมในคิวภายในของสตรีมที่มีการควบคุม

queuingStrategy

อาร์กิวเมนต์ที่สองของตัวสร้าง ReadableStream() ที่ไม่บังคับเช่นกันคือ queuingStrategy ออบเจ็กต์นี้กำหนดกลยุทธ์การจัดคิวสำหรับสตรีม (ไม่บังคับ) พารามิเตอร์:

  • highWaterMark: จำนวนไบต์ที่ไม่เป็นลบซึ่งบ่งชี้ถึงลายน้ำที่สูงของสตรีมโดยใช้กลยุทธ์การจัดคิวนี้ ค่านี้ใช้เพื่อระบุความดันกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ ReadableByteStreamController.desiredSize ที่เหมาะสม นอกจากนี้ยังควบคุมเวลาที่เรียกใช้เมธอด pull() ของแหล่งที่มาที่สำคัญด้วย

เมธอด getReader() และ read()

จากนั้นคุณจะรับสิทธิ์เข้าถึง ReadableStreamBYOBReader ได้โดยตั้งค่าพารามิเตอร์ mode ให้สอดคล้องกัน ดังนี้ ReadableStream.getReader({ mode: "byob" }) วิธีนี้ทำให้ควบคุมบัฟเฟอร์ได้แม่นยำยิ่งขึ้น เพื่อหลีกเลี่ยงเนื้อหาที่ตรงกัน หากต้องการอ่านจากสตรีมไบต์ คุณต้องเรียก ReadableStreamBYOBReader.read(view) โดยที่ view เป็น ArrayBufferView

ตัวอย่างโค้ดสตรีมไบต์ที่อ่านได้

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

ฟังก์ชันต่อไปนี้จะแสดงผลสตรีมไบต์ที่อ่านได้ ซึ่งช่วยให้อ่านค่า อาร์เรย์ที่สร้างขึ้นแบบสุ่ม แทนที่จะใช้กลุ่มขนาดที่กำหนดไว้ล่วงหน้า 1,024 กลุ่ม ระบบจะพยายามเติมเต็ม บัฟเฟอร์ที่นักพัฒนาซอฟต์แวร์มีให้ ทำให้ควบคุมได้เต็มที่

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

กลไกของสตรีมที่เขียนได้

สตรีมที่สามารถเขียนได้ คือปลายทางที่คุณสามารถเขียนข้อมูลซึ่งแสดงเป็น JavaScript ด้วย WritableStream ช่วงเวลานี้ ทำหน้าที่เป็นบทคัดย่อที่ด้านบนของซิงก์เบื้องหลัง ซึ่งเป็นซิงก์ I/O ระดับล่างที่ จะมีการเขียนข้อมูลดิบ

ระบบจะเขียนข้อมูลไปยังสตรีมผ่านผู้เขียนทีละกลุ่ม การมีพลังเกินที่กำหนด รูปแบบต่างๆ มากมาย เช่นเดียวกับกลุ่มข้อมูลในตัวอ่าน คุณสามารถใช้โค้ดใดก็ได้ที่คุณต้องการสร้าง ส่วนที่พร้อมสำหรับการเขียน ผู้เขียน Plus Codes ที่เกี่ยวข้องกันจะเรียกว่า Producer

เมื่อมีการสร้างนักเขียนและเริ่มเขียนไปยังสตรีม (นักเขียนที่กระตือรือร้น) จะเป็นการเขียนว่า ล็อกไว้ ผู้แต่งเพียง 1 คนเท่านั้นที่สามารถเขียนไปยังสตรีมที่เขียนพร้อมกันได้ต่อครั้ง ถ้าต้องการอีก ผู้เขียนเพื่อเริ่มเขียนสตรีมของคุณ โดยทั่วไปคุณจะต้องเผยแพร่สตรีมก่อนที่จะแนบ มีนักเขียนอีกคน

คิวภายในจะติดตามกลุ่มที่เขียนไปยังสตรีมแต่ยังไม่ได้สร้าง ได้รับการประมวลผลโดยซิงก์ที่สำคัญ

กลยุทธ์การจัดคิวคือออบเจ็กต์ที่กำหนดวิธีที่สตรีมควรส่งสัญญาณย้อนกลับโดยยึดตาม สถานะของคิวภายใน กลยุทธ์การจัดคิวจะกำหนดขนาดให้กับแต่ละกลุ่ม และเปรียบเทียบตัวเลือก ขนาดรวมของกลุ่มทั้งหมดในคิวตามจำนวนที่ระบุ หรือที่เรียกว่าเครื่องหมายบนน้ำสูง

โครงสร้างขั้นสุดท้ายเรียกว่าตัวควบคุม แต่ละสตรีมที่เขียนได้จะมีตัวควบคุมเชื่อมโยงอยู่ซึ่ง ช่วยให้คุณควบคุมสตรีมได้ (เช่น ล้มเลิก)

การสร้างสตรีมที่เขียนได้

อินเทอร์เฟซของ WritableStream ของ Streams API ซึ่งมีนามธรรมมาตรฐานสำหรับการเขียนข้อมูลสตรีมมิงไปยังปลายทางที่รู้จักกัน เหมือนอ่างล้างจาน วัตถุนี้มาพร้อมกับ Backpressure และคิวในตัว คุณสร้างสตรีมที่เขียนได้โดยทำดังนี้ กำลังเรียกตัวสร้าง WritableStream() มีพารามิเตอร์ underlyingSink ที่ไม่บังคับ ซึ่งแสดงถึงออบเจ็กต์ พร้อมเมธอดและพร็อพเพอร์ตี้ที่กำหนดลักษณะการทำงานของอินสแตนซ์สตรีมที่สร้างขึ้น

underlyingSink

underlyingSink อาจมีเมธอดที่นักพัฒนาแอปกำหนดหรือไม่ก็ได้ต่อไปนี้ controller ที่ส่งผ่านไปยังบางเมธอดคือ WritableStreamDefaultController

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ เนื้อหาของวิธีการนี้ควรมุ่งเข้าถึงซิงก์ที่เกี่ยวข้อง หากกระบวนการนี้ เพราะการทำแบบอะซิงโครนัส อาจทำให้ได้รับสัญญาที่ส่งสัญญาณว่าสำเร็จหรือล้มเหลว
  • write(chunk, controller): ระบบจะเรียกเมธอดนี้เมื่อมีข้อมูลกลุ่มใหม่ (ที่ระบุไว้ใน chunk) พร้อมที่จะเขียนไปยังซิงก์ที่อยู่ด้านล่างแล้ว สามารถให้คำมั่นสัญญาว่าจะ สัญญาณว่าเขียนสำเร็จหรือล้มเหลว ระบบจะเรียกเมธอดนี้หลังจากรายการก่อนหน้าเท่านั้น การเขียนนั้นประสบความสำเร็จ และไม่เคยหลังจากปิดหรือล้มเลิกสตรีม
  • close(controller): ระบบจะเรียกวิธีการนี้หากแอปส่งสัญญาณว่าเขียนเสร็จแล้ว เป็นส่วนๆ ในสตรีม เนื้อหาควรทำทุกอย่างที่จำเป็นเพื่อสรุปการเขียน ซิงก์ที่เกี่ยวข้อง แล้วปล่อยสิทธิ์การเข้าถึง หากกระบวนการนี้เป็นแบบอะซิงโครนัส ระบบอาจแสดงผล สัญญาว่าจะให้สัญญาณว่าประสบความสำเร็จหรือล้มเหลว ระบบจะเรียกเมธอดนี้หลังจากการเขียนที่อยู่ในคิวทั้งหมดแล้วเท่านั้น ประสบความสำเร็จ
  • abort(reason): ระบบจะเรียกวิธีการนี้หากแอปส่งสัญญาณว่าต้องการปิดอย่างกะทันหัน สตรีมและทำให้อยู่ในสถานะมีข้อผิดพลาด ช่วยล้างทรัพยากรที่ระงับไว้ เช่น close() แต่ระบบจะเรียก abort() แม้ว่าการเขียนจะรอคิวอยู่ กลุ่มเหล่านี้จะถูกขว้าง ได้เลย หากกระบวนการนี้เป็นกระบวนการที่ไม่พร้อมกัน กระบวนการดังกล่าวสามารถให้สัญญาณที่บ่งบอกถึงความสำเร็จหรือความล้มเหลว พารามิเตอร์ reason มี DOMString ที่อธิบายสาเหตุที่ทำให้สตรีมถูกล้มเลิก
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

WritableStreamDefaultController อินเทอร์เฟซของ Streams API แสดงถึงตัวควบคุมที่อนุญาตการควบคุมสถานะของ WritableStream ในระหว่างการตั้งค่า เนื่องจากมีการส่งเนื้อหาจำนวนมากมาเขียน หรือในช่วงท้ายของการเขียน เมื่อสร้าง WritableStream ซิงก์ที่รองรับจะได้รับ WritableStreamDefaultController ที่สอดคล้องกัน ที่จะปรับเปลี่ยน WritableStreamDefaultController มีเมธอดเพียงวิธีเดียว ได้แก่ WritableStreamDefaultController.error(), ซึ่งทำให้การโต้ตอบในอนาคตกับสตรีมที่เชื่อมโยงเกิดข้อผิดพลาด WritableStreamDefaultController ยังรองรับพร็อพเพอร์ตี้ signal ซึ่งแสดงผลอินสแตนซ์ของ AbortSignal ทำให้สามารถหยุดการดำเนินการ WritableStream ได้ หากจำเป็น

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

อาร์กิวเมนต์ที่สองของตัวสร้าง WritableStream() ที่ไม่บังคับเช่นกันคือ queuingStrategy ออบเจ็กต์นี้กำหนดกลยุทธ์การจัดคิวสำหรับสตรีม โดยใช้ 2 วิธี ได้แก่

  • highWaterMark: จำนวนที่ไม่เป็นลบที่ระบุน้ำสูงสุดของสตรีมที่ใช้กลยุทธ์การจัดคิวนี้
  • size(chunk): ฟังก์ชันที่คํานวณและแสดงผลขนาดที่ไม่เป็นลบจํากัดของค่ากลุ่มที่ระบุ ผลลัพธ์จะใช้เพื่อระบุความดันกลับ ซึ่งแสดงผ่านพร็อพเพอร์ตี้ WritableStreamDefaultWriter.desiredSize ที่เหมาะสม

เมธอด getWriter() และ write()

ในการเขียนไปยังสตรีมที่เขียนได้ คุณจะต้องมีนักเขียน ซึ่งจะเป็น WritableStreamDefaultWriter เมธอด getWriter() ของอินเทอร์เฟซ WritableStream แสดงผล อินสแตนซ์ใหม่ของ WritableStreamDefaultWriter และล็อกสตรีมไปยังอินสแตนซ์นั้น ในขณะที่ สตรีมล็อกอยู่ และไม่สามารถหาผู้เขียนรายอื่นได้จนกว่าจะมีการเผยแพร่สตรีมปัจจุบัน

write() ของวิธีการ WritableStreamDefaultWriter จะเขียนกลุ่มข้อมูลที่ส่งผ่านไปยัง WritableStream และซิงก์ที่เกี่ยวข้อง จากนั้นจะส่งกลับ คำมั่นสัญญาที่แก้ไขเพื่อระบุความสำเร็จหรือความล้มเหลวของการดำเนินการเขียน โปรดทราบว่า "ความสำเร็จ" หมายถึงกำลังขึ้นอ่างล้างจาน ก็อาจเป็นสัญญาณว่ายอมรับกลุ่มนั้นแล้ว และไม่จำเป็นต้องบันทึก URL นั้นไปยังปลายทางสุดท้ายอย่างปลอดภัย

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

พร็อพเพอร์ตี้ locked

คุณสามารถตรวจสอบว่าสตรีมที่เขียนได้ถูกล็อกหรือไม่โดยการเข้าถึง WritableStream.locked

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

ตัวอย่างรหัสสตรีมที่เขียนได้

ตัวอย่างโค้ดด้านล่างแสดงขั้นตอนทั้งหมดในการใช้งาน

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

เชื่อมโยงสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้

สตรีมที่สามารถอ่านได้สามารถเชื่อมไปยังสตรีมที่เขียนได้ ผ่าน pipeTo() ReadableStream.pipeTo() เชื่อม ReadableStream ปัจจุบันไปยัง WritableStream ที่ระบุและแสดงผล สัญญาว่าจะทำงานได้จริงเมื่อกระบวนการท่อเสร็จสมบูรณ์ หรือปฏิเสธเมื่อเกิดข้อผิดพลาด ที่พบ

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

การสร้างสตรีมการเปลี่ยนรูปแบบ

อินเทอร์เฟซ TransformStream ของ Streams API แสดงชุดข้อมูลที่เปลี่ยนรูปแบบได้ คุณ สร้างสตรีมการเปลี่ยนรูปแบบโดยเรียกตัวสร้าง TransformStream() ซึ่งจะสร้างและแสดงผล ออบเจ็กต์สตรีมการเปลี่ยนรูปแบบจากเครื่องจัดการที่ระบุ ตัวสร้าง TransformStream() ยอมรับเป็น อาร์กิวเมนต์แรกคือออบเจ็กต์ JavaScript ที่ไม่บังคับที่แสดงถึง transformer วัตถุดังกล่าวสามารถ ประกอบด้วยวิธีการใดก็ได้ต่อไปนี้

transformer

  • start(controller): ระบบจะเรียกใช้เมธอดนี้ทันทีเมื่อมีการสร้างออบเจ็กต์ ราคาปกติ ค่านี้ใช้สำหรับจัดคิวกลุ่มคำนำหน้าโดยใช้ controller.enqueue() ระบบจะอ่านเนื้อหาเหล่านี้ จากด้านที่อ่านได้ แต่ไม่ต้องขึ้นอยู่กับการเขียนในด้านที่เขียนได้ หากชื่อย่อนี้ ไม่พร้อมกัน เช่น เนื่องจากต้องใช้ความพยายามอย่างมากในการรับส่วนคำนำหน้า ฟังก์ชันดังกล่าวสามารถให้คำมั่นสัญญาที่จะบ่งบอกว่าสำเร็จหรือล้มเหลว สัญญาที่ถูกปฏิเสธจะทำให้ สตรีม เครื่องมือสร้าง TransformStream() จะทิ้งข้อยกเว้นทั้งหมดอีกครั้ง
  • transform(chunk, controller): ระบบจะเรียกเมธอดนี้เมื่อมีการเขียนกลุ่มใหม่ลงใน ด้านที่เขียนได้ที่พร้อมจะแปลงโฉม การใช้สตรีมเป็นการรับประกันว่าฟังก์ชันนี้ จะมีการเรียกหลังจากการแปลงครั้งก่อนสำเร็จเท่านั้น และไม่เคยมีมาก่อน start() เสร็จสิ้นหรือหลังจากที่ flush() มีการเรียก ฟังก์ชันนี้จะดำเนินการเปลี่ยนรูปแบบจริง ของสตรีมการเปลี่ยนรูปแบบ กำหนดลำดับของผลลัพธ์โดยใช้ controller.enqueue() ช่วงเวลานี้ อนุญาตให้ส่วนเดียวที่เขียนด้วยด้านที่เขียนได้จะทำให้มี 0 หรือหลายๆ ส่วน ด้านที่อ่านได้ ทั้งนี้ขึ้นอยู่กับจำนวนครั้งที่มีการเรียกใช้ controller.enqueue() หากกระบวนการของ การเปลี่ยนรูปแบบเป็นแบบไม่พร้อมกัน ฟังก์ชันนี้สามารถให้คำสัญญาในการส่งสัญญาณความสำเร็จหรือความล้มเหลว การเปลี่ยนรูปแบบ คำสัญญาที่ถูกปฏิเสธจะทำให้ข้อผิดพลาดทั้งด้านที่อ่านได้และเขียนได้ เปลี่ยนสตรีม ถ้าไม่ได้ระบุเมธอด transform() ระบบจะใช้การแปลงข้อมูลประจำตัว จํานวนช่วงที่ระบุมีการเปลี่ยนแปลงจากด้านที่เขียนได้ไปเป็นด้านที่อ่านได้
  • flush(controller): เราจะเรียกวิธีนี้หลังจากชิ้นส่วนทั้งหมดที่เขียนในด้านที่เขียนได้ เปลี่ยนไปผ่าน transform() สำเร็จและด้านที่เขียนได้กำลังจะ ปิดแล้ว โดยทั่วไปแล้วจะใช้สำหรับจัดคิวส่วนต่อท้ายไว้ด้านที่อ่านได้ ก่อนหน้านั้น ปิดแล้ว หากกระบวนการล้างออกไม่พร้อมกัน ฟังก์ชันอาจให้ผลลัพธ์เป็น สัญญาณว่าเสร็จสมบูรณ์หรือล้มเหลว ผลลัพธ์จะได้รับการแจ้งให้ผู้ที่เรียกใช้ stream.writable.write() นอกจากนี้ สัญญาที่ถูกปฏิเสธจะทำให้เกิดข้อผิดพลาดทั้งในส่วนของข้อมูลที่อ่านได้และ ด้านที่เขียนได้ของสตรีม การส่งข้อยกเว้นจะถือว่าเช่นเดียวกับการส่งคืน สัญญา
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

กลยุทธ์การจัดคิว writableStrategy และ readableStrategy

พารามิเตอร์ที่ไม่บังคับรายการที่ 2 และ 3 ของเครื่องมือสร้าง TransformStream() เป็นพารามิเตอร์ที่ไม่บังคับ กลยุทธ์การจัดคิว writableStrategy และ readableStrategy มีคำจำกัดความตามที่ระบุไว้ใน อ่านอย่างเดียว และสตรีมที่เขียนได้ ตามลำดับ

ตัวอย่างรหัสสตรีมการเปลี่ยนรูปแบบ

ตัวอย่างโค้ดต่อไปนี้แสดงการทำงานของสตรีมการเปลี่ยนรูปแบบแบบง่าย

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

การเชื่อมต่อสตรีมที่อ่านได้ผ่านทางสตรีมการเปลี่ยนรูปแบบ

pipeThrough() ของอินเทอร์เฟซ ReadableStream ให้วิธีการแบบห่วงโซ่ในการเชื่อมต่อสตรีมปัจจุบัน ผ่านสตรีมการเปลี่ยนรูปแบบหรือคู่อื่นๆ ที่เขียนได้/อ่านได้ โดยทั่วไปการวางท่อสตรีมจะล็อก เพื่อป้องกันไม่ให้ผู้อ่านคนอื่นๆ ล็อกช่อง

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

ตัวอย่างโค้ดถัดไป (กล่าวอีกเล็กน้อย) จะแสดงวิธีใช้ "การกล่าวถึง" เวอร์ชันของ fetch() เป็นตัวพิมพ์ใหญ่จากข้อความทั้งหมดโดยใช้สัญญาการตอบกลับที่ส่งคืน เป็นสตรีม และนำตัวอักษรมาเรียงต่อกันเป็นกลุ่ม ข้อดีของวิธีนี้ คือคุณไม่ต้องรอให้ เอกสารทั้งหมดที่ต้องดาวน์โหลด ซึ่งสร้างความแตกต่างได้อย่างมากเมื่อต้องจัดการกับไฟล์ขนาดใหญ่

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

สาธิต

การสาธิตด้านล่างแสดงให้เห็นการทำงานของสตรีมที่อ่านได้ เขียนได้ และเปลี่ยนรูปแบบ และมีตัวอย่าง ของห่วงโซ่ไปป์ pipeThrough() และ pipeTo() รวมถึงแสดง tee() คุณสามารถเลือกเรียกใช้ การสาธิตในหน้าต่างของตัวผลิตภัณฑ์เองหรือดู ซอร์สโค้ด

สตรีมที่มีประโยชน์ซึ่งมีอยู่ในเบราว์เซอร์

มีสตรีมที่เป็นประโยชน์จำนวนมากสร้างในตัวเบราว์เซอร์โดยตรง คุณสามารถสร้าง ReadableStream จาก Blob Blob เมธอด stream() ของอินเทอร์เฟซกลับ ReadableStream ซึ่งขณะอ่านจะแสดงผลข้อมูลที่มีอยู่ใน BLOB นอกจากนี้ โปรดจำว่า File เป็นประเภทที่เฉพาะเจาะจงของ Blob และใช้ในบริบทใดก็ได้ที่ BLOB ทำได้

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

เราจะเรียกรูปแบบสตรีมมิงของ TextDecoder.decode() และ TextEncoder.encode() ว่า TextDecoderStream และ TextEncoderStream ตามลำดับ

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

การบีบอัดหรือขยายไฟล์สามารถทำได้ง่ายด้วย CompressionStream และ สตรีมการเปลี่ยนรูปแบบ DecompressionStream ตามลำดับ ตัวอย่างโค้ดด้านล่างแสดงวิธีดาวน์โหลดข้อกำหนดของสตรีมและบีบอัด (gzip) ในเบราว์เซอร์ และเขียนไฟล์ที่บีบอัดลงในดิสก์โดยตรง

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

ของ File System Access API FileSystemWritableFileStream และสตรีมคำขอ fetch() ทดลอง ได้แก่ ตัวอย่างสตรีมที่เขียนได้ขึ้นในป่า

Serial API มีการใช้ทั้งสตรีมที่อ่านได้และเขียนได้เป็นจำนวนมาก

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

สุดท้าย WebSocketStream API จะผสานรวมสตรีมกับ WebSocket API

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

แหล่งข้อมูลที่มีประโยชน์

กิตติกรรมประกาศ

บทความนี้ได้รับการตรวจสอบโดย Jake Archibald François Beaufort Sam Dutton Mattias Buelens Surma Joe Medley และ Adam Rice บล็อกโพสต์ของ Jake Archibald ช่วยฉันทำความเข้าใจได้มาก สตรีม ตัวอย่างโค้ดบางส่วนได้รับแรงบันดาลใจมาจากผู้ใช้ GitHub การสํารวจและของ @bellbind ส่วนของร้อยแก้วจะเน้น MDN Web Docs ในสตรีม ของสตรีมแบบมาตรฐาน authors ได้ทำงานอย่างล้นหลาม ในการเขียนข้อกำหนดนี้ รูปภาพหลักโดย Ryan Lara ใน เปิดเสียง