Luồng — Hướng dẫn chính thức

Tìm hiểu cách sử dụng API có thể đọc, ghi và chuyển đổi luồng bằng API Luồng.

API Luồng cho phép bạn truy cập theo phương thức lập trình các luồng dữ liệu nhận được qua mạng hoặc được tạo bằng bất kỳ phương thức nào cục bộ và xử lý chúng bằng JavaScript. Quá trình truyền trực tuyến bao gồm việc chia nhỏ một tài nguyên mà bạn muốn nhận, gửi hoặc chuyển đổi thành các phần nhỏ, sau đó xử lý từng phần nhỏ. Mặc dù truyền trực tuyến vẫn là một việc mà trình duyệt vẫn làm khi nhận được các thành phần như HTML hoặc video sẽ hiển thị trên trang web, nhưng tính năng này chưa từng có sẵn cho JavaScript trước khi fetch có luồng được ra mắt vào năm 2015.

Trước đây, nếu muốn xử lý một tài nguyên thuộc loại nào đó (có thể là video hoặc tệp văn bản, v.v.), bạn sẽ phải tải toàn bộ tệp xuống, đợi giải trình tự thành định dạng phù hợp rồi xử lý. Với các luồng có sẵn cho JavaScript, tất cả điều này sẽ thay đổi. Bạn hiện có thể xử lý dần dữ liệu thô bằng JavaScript ngay khi có trên ứng dụng mà không cần tạo vùng đệm, chuỗi hoặc blob. Việc này giúp mở khoá một số trường hợp sử dụng, sau đây là một số trường hợp sử dụng mà tôi liệt kê:

  • VideoEffect (Hiệu ứng video): truyền cho một luồng video dễ đọc thông qua một luồng biến đổi có thể áp dụng các hiệu ứng theo thời gian thực.
  • Nén) dữ liệu: nén luồng tệp thông qua một luồng biến đổi để nén (de) dữ liệu một cách có chọn lọc.
  • Giải mã hình ảnh: truyền một luồng phản hồi HTTP thông qua một luồng biến đổi sẽ giải mã byte thành dữ liệu bitmap, sau đó thông qua một luồng biến đổi khác dịch bitmap thành PNG. Nếu được cài đặt bên trong bộ xử lý fetch của một trình chạy dịch vụ, bạn sẽ có thể chèn các định dạng hình ảnh mới một cách rõ ràng như AVIF.

Hỗ trợ trình duyệt

ReadableStream và WritableStream

Hỗ trợ trình duyệt

  • 43
  • 14
  • 65
  • 10.1

Nguồn

TransformStream

Hỗ trợ trình duyệt

  • 67
  • 79
  • 102
  • 14,1

Nguồn

Khái niệm chính

Trước khi đi sâu vào chi tiết về các loại luồng, tôi sẽ giới thiệu một số khái niệm chính.

Viên nhỏ

Đoạn là một phần dữ liệu duy nhất được ghi vào hoặc đọc từ một luồng. Luồng có thể thuộc bất kỳ loại nào; thậm chí các luồng thậm chí có thể chứa nhiều đoạn thuộc nhiều loại. Trong hầu hết trường hợp, một phân đoạn sẽ không phải là đơn vị dữ liệu nguyên tử nhất cho một luồng nhất định. Ví dụ: một luồng byte có thể chứa các đoạn bao gồm 16 đơn vị Uint8Array KiB, thay vì các byte đơn.

Luồng có thể đọc

Một luồng dễ đọc đại diện cho nguồn dữ liệu mà từ đó bạn có thể đọc. Nói cách khác, dữ liệu xuất ra từ một luồng có thể đọc được. Cụ thể, luồng có thể đọc được là một thực thể của lớp ReadableStream.

Luồng có thể ghi

Luồng có thể ghi đại diện cho đích đến cho dữ liệu mà bạn có thể ghi vào đó. Nói cách khác, dữ liệu chuyển đến một luồng có thể ghi. Cụ thể, luồng có thể ghi là một thực thể của lớp WritableStream.

Biến đổi sự kiện phát trực tiếp

Luồng biến đổi bao gồm một cặp luồng: một luồng có thể ghi, còn gọi là bên có thể ghi và một luồng có thể đọc được, được gọi là bên có thể đọc. Một phép ẩn dụ thực tế cho trường hợp này sẽ là một thông dịch viên đồng thời dịch nhanh từ ngôn ngữ này sang ngôn ngữ khác. Theo cách dành riêng cho luồng biến đổi, việc ghi vào phía có thể ghi sẽ dẫn đến việc dữ liệu mới được cung cấp để đọc từ phía có thể đọc được. Cụ thể, bất kỳ đối tượng nào có thuộc tính writable và thuộc tính readable đều có thể phân phát dưới dạng luồng biến đổi. Tuy nhiên, lớp TransformStream tiêu chuẩn giúp bạn dễ dàng tạo một cặp được ghép đúng cách như vậy.

Chuỗi ống

Các luồng chủ yếu được sử dụng bằng cách liên kết chúng cho nhau. Bạn có thể chuyển luồng có thể đọc được trực tiếp đến luồng có thể ghi bằng phương thức pipeTo() của luồng có thể đọc được, hoặc trước tiên, luồng này có thể được truyền qua một hoặc nhiều luồng biến đổi bằng phương thức pipeThrough() của luồng có thể đọc được. Một tập hợp các luồng được nối với nhau theo cách này được gọi là một chuỗi ống.

Áp lực ngược

Sau khi một chuỗi đường ống được xây dựng, chuỗi đường ống đó sẽ truyền các tín hiệu về tốc độ truyền các phần của chuỗi đó. Nếu có bất kỳ bước nào trong chuỗi chưa chấp nhận các đoạn, thì bước đó sẽ truyền tín hiệu ngược qua chuỗi ống cho đến khi cuối cùng nguồn ban đầu được yêu cầu ngừng tạo các đoạn quá nhanh. Quá trình chuẩn hoá quy trình này được gọi là áp lực ngược (backpressure).

Phát bóng

Một luồng có thể đọc được có thể được định vị (được đặt tên theo hình dạng chữ "T" viết hoa) bằng cách sử dụng phương thức tee(). Thao tác này sẽ khoá luồng tức là khiến luồng không còn sử dụng được trực tiếp nữa; tuy nhiên, thao tác này sẽ tạo ra 2 luồng mới, được gọi là các nhánh, có thể được sử dụng độc lập. Việc phát trực tiếp cũng rất quan trọng vì bạn không thể tua lại hoặc khởi động lại sự kiện. Bạn có thể xem thêm thông tin về vấn đề này ở phần sau.

Sơ đồ chuỗi quy trình bao gồm một luồng có thể đọc được từ lệnh gọi đến API tìm nạp. Sau đó, luồng này được chuyển qua luồng biến đổi có đầu ra được cập nhật, sau đó được gửi đến trình duyệt để tạo luồng có thể đọc được kết quả đầu tiên và đến bộ nhớ đệm của trình chạy dịch vụ cho luồng có thể đọc được kết quả thứ hai.
Một chuỗi ống.

Cơ chế của một luồng có thể đọc được

Luồng có thể đọc được là nguồn dữ liệu được biểu thị bằng JavaScript bằng đối tượng ReadableStream chảy từ nguồn cơ bản. Hàm khởi tạo ReadableStream() tạo và trả về một đối tượng luồng có thể đọc được từ các trình xử lý nhất định. Có 2 loại nguồn cơ bản:

  • Nguồn đẩy liên tục đẩy dữ liệu về phía bạn khi bạn đã truy cập vào chúng và bạn có quyền bắt đầu, tạm dừng hoặc huỷ quyền truy cập vào luồng dữ liệu. Ví dụ: luồng video trực tiếp, sự kiện do máy chủ gửi hoặc WebSocket.
  • Nguồn kéo yêu cầu bạn phải yêu cầu dữ liệu rõ ràng từ các nguồn đó sau khi được kết nối. Ví dụ: các hoạt động HTTP thông qua lệnh gọi fetch() hoặc XMLHttpRequest.

Dữ liệu luồng được đọc tuần tự theo các phần nhỏ gọi là đoạn. Các phân đoạn được đặt trong một luồng được xem là được thêm vào hàng đợi. Điều này có nghĩa là chúng đang chờ trong hàng đợi sẵn sàng để đọc. Hàng đợi nội bộ theo dõi các phân đoạn chưa được đọc.

Chiến lược xếp hàng là một đối tượng xác định cách một luồng sẽ báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng sẽ chỉ định kích thước cho mỗi phần và so sánh tổng kích thước của tất cả phần trong hàng đợi với một con số được chỉ định, được gọi là dấu mực nước cao.

Trình đọc sẽ đọc các đoạn trong luồng dữ liệu. Trình đọc này truy xuất từng phần dữ liệu một, cho phép bạn thực hiện bất kỳ loại thao tác nào trên đó. Trình đọc cùng với mã xử lý khác đi kèm với mã đó được gọi là trình tiêu dùng.

Cấu trúc tiếp theo trong ngữ cảnh này được gọi là trình điều khiển. Mỗi luồng có thể đọc được đều có một bộ điều khiển liên kết, như tên gọi, cho phép bạn kiểm soát luồng đó.

Tại một thời điểm, chỉ một trình đọc có thể đọc một luồng; khi một trình đọc được tạo và bắt đầu đọc một luồng (nghĩa là trở thành trình đọc đang hoạt động), thì luồng đó sẽ bị khoá. Nếu bạn muốn một độc giả khác kiểm soát việc đọc sự kiện phát trực tiếp của mình, thì thông thường, bạn cần phát hành trình đọc đầu tiên rồi mới làm bất cứ việc gì khác (mặc dù bạn có thể phát trực tiếp).

Tạo luồng dễ đọc

Bạn tạo một luồng có thể đọc được bằng cách gọi hàm khởi tạo ReadableStream() của luồng đó. Hàm khởi tạo có một đối số không bắt buộc underlyingSource. Đối số này đại diện cho một đối tượng với các phương thức và thuộc tính giúp xác định cách hoạt động của thực thể luồng đã tạo.

underlyingSource

Bạn có thể sử dụng các phương thức không bắt buộc do nhà phát triển xác định sau đây:

  • start(controller): Được gọi ngay khi đối tượng được tạo. Phương thức này có thể truy cập vào nguồn luồng và thực hiện mọi thao tác cần thiết khác để thiết lập chức năng của luồng. Nếu quá trình này được thực hiện không đồng bộ, phương thức có thể trả về một hứa hẹn cho biết thành công hay không. Tham số controller được truyền đến phương thức này là ReadableStreamDefaultController.
  • pull(controller): Có thể dùng để kiểm soát luồng khi tìm nạp nhiều phần hơn. Phương thức này được gọi nhiều lần miễn là hàng đợi các đoạn nội bộ của luồng chưa đầy, cho đến khi hàng đợi này đạt đến mực nước cao. Nếu kết quả của lệnh gọi pull() là một lời hứa, thì pull() sẽ không được gọi lại cho đến khi lời hứa đó được thực hiện. Nếu lời hứa bị từ chối, sự kiện phát trực tiếp sẽ bị lỗi.
  • cancel(reason): Được gọi khi thực thể tiêu thụ luồng huỷ luồng.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController hỗ trợ các phương thức sau:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Đối số thứ hai, cũng không bắt buộc, của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng sẽ tuỳ ý xác định chiến lược xếp hàng cho luồng, chiến lược này sẽ có 2 tham số:

  • highWaterMark: Số không âm cho biết mực nước cao của dòng suối sử dụng chiến lược xếp hàng này.
  • size(chunk): Một hàm tính toán và trả về kích thước hữu hạn không âm của một giá trị phân đoạn đã cho. Kết quả được dùng để xác định backpressure, biểu thị thông qua thuộc tính ReadableStreamDefaultController.desiredSize thích hợp. Trạng thái này cũng chi phối thời điểm phương thức pull() của nguồn cơ bản được gọi.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Phương thức getReader()read()

Để đọc từ một luồng có thể đọc được, bạn cần có một trình đọc, là ReadableStreamDefaultReader. Phương thức getReader() của giao diện ReadableStream sẽ tạo một trình đọc và khoá luồng với trình đọc đó. Trong khi luồng bị khoá, bạn không thể thu thập trình đọc nào khác cho đến khi trình đọc này được phát hành.

Phương thức read() của giao diện ReadableStreamDefaultReader sẽ trả về một lời hứa cung cấp quyền truy cập vào phân đoạn tiếp theo trong hàng đợi nội bộ của luồng. Phương thức này đáp ứng hoặc từ chối kết quả tuỳ thuộc vào trạng thái của luồng. Sau đây là các khả năng có thể xảy ra:

  • Nếu có một đoạn, lời hứa sẽ được thực hiện bằng một đối tượng có dạng
    { value: chunk, done: false }.
  • Nếu luồng bị đóng, lời hứa sẽ được thực hiện bằng một đối tượng có dạng
    { value: undefined, done: true }.
  • Nếu sự kiện phát trực tiếp bị lỗi, thì lời hứa sẽ bị từ chối kèm theo lỗi có liên quan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Thuộc tính locked

Bạn có thể kiểm tra xem một luồng có thể đọc được có bị khoá hay không bằng cách truy cập vào thuộc tính ReadableStream.locked của luồng đó.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mã mẫu luồng có thể đọc được

Mã mẫu dưới đây cho thấy tất cả các bước trong thực tế. Trước tiên, bạn cần tạo một ReadableStream trong đối số underlyingSource (tức là lớp TimestampSource) để xác định một phương thức start(). Phương thức này thông báo cho controller của luồng đến enqueue() một dấu thời gian mỗi giây trong mười giây. Cuối cùng, lớp này yêu cầu bộ điều khiển close() truyền luồng. Bạn sử dụng luồng này bằng cách tạo một trình đọc thông qua phương thức getReader() và gọi read() cho đến khi luồng là done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Lặp lại không đồng bộ

Kiểm tra mỗi vòng lặp read() nếu luồng là done có thể không phải là API thuận tiện nhất. Thật may là sẽ sớm có cách tốt hơn để thực hiện việc này: lặp lại không đồng bộ.

for await (const chunk of stream) {
  console.log(chunk);
}

Một giải pháp để sử dụng vòng lặp không đồng bộ hiện nay là triển khai hành vi bằng một polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Phát trực tiếp dễ đọc

Phương thức tee() của giao diện ReadableStream liên kết với luồng có thể đọc được hiện tại, trả về một mảng hai phần tử chứa hai nhánh kết quả dưới dạng thực thể ReadableStream mới. Nhờ vậy, 2 độc giả có thể đọc một luồng cùng lúc. Ví dụ: bạn có thể thực hiện việc này trong một trình chạy dịch vụ nếu muốn tìm nạp một phản hồi từ máy chủ và truyền trực tuyến đến trình duyệt, nhưng đồng thời truyền phản hồi đó đến bộ nhớ đệm của trình chạy dịch vụ. Vì không thể sử dụng nội dung phản hồi nhiều lần, nên bạn cần có hai bản sao để thực hiện việc này. Để huỷ luồng, bạn cần huỷ cả hai nhánh thu được. Việc phát trực tiếp thường sẽ khoá sự kiện đó trong một khoảng thời gian để những trình đọc khác không khoá sự kiện đó.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Luồng byte có thể đọc

Đối với các luồng biểu thị byte, phiên bản mở rộng của luồng có thể đọc được cung cấp để xử lý hiệu quả các byte, đặc biệt là bằng cách giảm thiểu các bản sao. Luồng byte cho phép thu nạp trình đọc bộ đệm của riêng bạn (BYOB). Cách triển khai mặc định có thể cung cấp một loạt các đầu ra khác nhau, chẳng hạn như chuỗi hoặc vùng đệm mảng trong trường hợp WebSockets, trong khi luồng byte đảm bảo đầu ra byte. Ngoài ra, độc giả BYOB có lợi ích về sự ổn định. Điều này là do nếu một vùng đệm tách ra, điều này có thể đảm bảo rằng một vùng đệm không ghi vào cùng một vùng đệm hai lần, do đó, sẽ tránh được các điều kiện tranh đấu. Trình đọc BYOB có thể làm giảm số lần trình duyệt cần chạy thu gom rác vì trình duyệt có thể sử dụng lại vùng đệm.

Tạo luồng byte có thể đọc được

Bạn có thể tạo một luồng byte đọc được bằng cách truyền thêm một tham số type vào hàm khởi tạo ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Nguồn cơ bản của một luồng byte có thể đọc được được cấp một ReadableByteStreamController để thao tác. Phương thức ReadableByteStreamController.enqueue() nhận đối số chunk có giá trị là ArrayBufferView. Thuộc tính ReadableByteStreamController.byobRequest trả về yêu cầu kéo BYOB hiện tại hoặc trả về giá trị rỗng nếu không có. Cuối cùng, thuộc tính ReadableByteStreamController.desiredSize trả về kích thước mong muốn để lấp đầy hàng đợi nội bộ của luồng được kiểm soát.

queuingStrategy

Đối số thứ hai, cũng không bắt buộc, của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng tuỳ ý xác định chiến lược xếp hàng cho luồng, chiến lược này sẽ có một thông số:

  • highWaterMark: Số byte không âm cho biết mực nước dâng cao của luồng bằng cách sử dụng chiến lược xếp hàng này. Thuộc tính này dùng để xác định backpressure, biểu thị thông qua thuộc tính ReadableByteStreamController.desiredSize thích hợp. Trạng thái này cũng chi phối thời điểm phương thức pull() của nguồn cơ bản được gọi.

Phương thức getReader()read()

Sau đó, bạn có thể truy cập vào ReadableStreamBYOBReader bằng cách đặt tham số mode tương ứng: ReadableStream.getReader({ mode: "byob" }). Điều này cho phép kiểm soát chính xác hơn việc phân bổ vùng đệm để tránh việc sao chép. Để đọc từ luồng byte, bạn cần gọi ReadableStreamBYOBReader.read(view), trong đó viewArrayBufferView.

Mẫu mã luồng byte có thể đọc được

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Hàm sau trả về các luồng byte có thể đọc được, cho phép đọc hiệu quả từ 0 đến 0 của một mảng được tạo ngẫu nhiên. Thay vì sử dụng kích thước phân đoạn được xác định trước là 1.024, công cụ này sẽ tìm cách lấp đầy vùng đệm do nhà phát triển cung cấp để có toàn quyền kiểm soát.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Cơ chế của một luồng có thể ghi

Luồng có thể ghi là một đích đến nơi bạn có thể ghi dữ liệu, được biểu thị bằng JavaScript bằng đối tượng WritableStream. Đây là một phần trừu tượng ở trên cùng của bể lưu trữ bên dưới – một bồn lưu trữ I/O cấp thấp hơn để ghi dữ liệu thô.

Dữ liệu được ghi vào luồng thông qua một người viết, mỗi đoạn một đoạn. Mỗi đoạn có thể có nhiều dạng, giống như các đoạn dữ liệu trong độc giả. Bạn có thể sử dụng bất kỳ mã nào mình muốn để tạo các đoạn sẵn sàng để viết; trình ghi cộng với mã liên kết được gọi là nhà sản xuất (Producer).

Khi một người viết được tạo và bắt đầu viết thư vào một luồng (người viết đang hoạt động), thì luồng đó được xem là bị khóa. Tại một thời điểm, chỉ một tác giả có thể ghi vào luồng có thể ghi. Nếu bạn muốn một tác giả khác bắt đầu viết vào sự kiện phát trực tiếp, thông thường, bạn cần phát hành phiên bản đó trước khi đính kèm một tác giả khác vào luồng đó.

Hàng đợi nội bộ theo dõi các phân đoạn đã được ghi vào luồng nhưng chưa được bồn lưu trữ dữ liệu cơ bản xử lý.

Chiến lược xếp hàng là một đối tượng xác định cách một luồng sẽ báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng sẽ chỉ định kích thước cho mỗi phần và so sánh tổng kích thước của tất cả phần trong hàng đợi với một con số được chỉ định, được gọi là dấu mực nước cao.

Cấu trúc cuối cùng được gọi là bộ điều khiển. Mỗi luồng có thể ghi có một bộ điều khiển liên kết cho phép bạn kiểm soát luồng đó (ví dụ: huỷ luồng).

Tạo luồng có thể ghi

Giao diện WritableStream của API Luồng cung cấp mô hình trừu tượng tiêu chuẩn để ghi dữ liệu truyền trực tuyến vào một đích đến, còn gọi là bồn lưu trữ dữ liệu. Đối tượng này đi kèm với tính năng áp lực ngược (backpressure) và tính năng thêm vào hàng đợi được tích hợp sẵn. Bạn tạo một luồng có thể ghi bằng cách gọi hàm khởi tạo WritableStream() của luồng đó. Lớp này có một tham số underlyingSink không bắt buộc. Tham số này đại diện cho một đối tượng có các phương thức và thuộc tính giúp xác định cách thực thể luồng đã tạo sẽ hoạt động.

underlyingSink

underlyingSink có thể bao gồm các phương thức không bắt buộc do nhà phát triển xác định sau đây. Tham số controller được truyền đến một số phương thức là WritableStreamDefaultController.

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Nội dung của phương thức này nên nhằm truy cập vào bồn lưu trữ dữ liệu cơ bản. Nếu quá trình này được thực hiện không đồng bộ, nó có thể trả về một hứa hẹn cho biết thành công hay không.
  • write(chunk, controller): Phương thức này sẽ được gọi khi một phần dữ liệu mới (được chỉ định trong tham số chunk) sẵn sàng được ghi vào bồn lưu trữ dữ liệu cơ bản. Phương thức này có thể trả về thông báo hứa hẹn cho biết thao tác ghi thành công hoặc không thành công. Phương thức này sẽ chỉ được gọi sau khi các lần ghi trước đó thành công và không bao giờ sau khi luồng bị đóng hoặc bị huỷ.
  • close(controller): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu rằng đã ghi xong các phân đoạn vào luồng. Nội dung cần thực hiện mọi việc cần thiết để hoàn tất hoạt động ghi vào bồn lưu trữ cơ sở và giải phóng quyền truy cập vào bồn lưu trữ đó. Nếu quá trình này không đồng bộ, quá trình này có thể trả về một hứa hẹn cho biết thành công hoặc không thành công. Phương thức này sẽ chỉ được gọi sau khi tất cả các lượt ghi trong hàng đợi đã thành công.
  • abort(reason): Phương thức này sẽ được gọi nếu ứng dụng ra tín hiệu rằng muốn đóng đột ngột luồng và đặt phương thức đó ở trạng thái lỗi. Phương thức này có thể xoá mọi tài nguyên được giữ lại, chẳng hạn như close(), nhưng abort() sẽ được gọi ngay cả khi các lượt ghi được đưa vào hàng đợi. Những phân đoạn đó sẽ bị loại bỏ. Nếu quá trình này không đồng bộ, quá trình có thể trả về một hứa hẹn cho biết thành công hay không thành công. Tham số reason chứa DOMString mô tả lý do khiến luồng bị huỷ.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Giao diện WritableStreamDefaultController của API Luồng đại diện cho bộ điều khiển cho phép kiểm soát trạng thái của WritableStream trong quá trình thiết lập, khi nhiều phần hơn được gửi để ghi hoặc ở cuối quá trình ghi. Khi xây dựng WritableStream, bồn lưu trữ dữ liệu cơ bản sẽ được cung cấp một thực thể WritableStreamDefaultController tương ứng để thao tác. WritableStreamDefaultController chỉ có một phương thức: WritableStreamDefaultController.error(), phương thức này sẽ gây lỗi cho mọi lượt tương tác trong tương lai với luồng được liên kết. WritableStreamDefaultController cũng hỗ trợ thuộc tính signal trả về một thực thể của AbortSignal, cho phép dừng thao tác WritableStream nếu cần.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Đối số thứ hai, cũng không bắt buộc, của hàm khởi tạo WritableStream()queuingStrategy. Đây là một đối tượng sẽ tuỳ ý xác định chiến lược xếp hàng cho luồng, chiến lược này sẽ có 2 tham số:

  • highWaterMark: Số không âm cho biết mực nước cao của dòng suối sử dụng chiến lược xếp hàng này.
  • size(chunk): Một hàm tính toán và trả về kích thước hữu hạn không âm của một giá trị phân đoạn đã cho. Kết quả được dùng để xác định backpressure, biểu thị thông qua thuộc tính WritableStreamDefaultWriter.desiredSize thích hợp.

Phương thức getWriter()write()

Để ghi vào một luồng có thể ghi, bạn cần một người viết, đó là WritableStreamDefaultWriter. Phương thức getWriter() của giao diện WritableStream sẽ trả về một thực thể mới của WritableStreamDefaultWriter và khoá luồng với thực thể đó. Trong khi luồng bị khoá, bạn không thể thu nạp người viết nào khác cho đến khi luồng hiện tại được phát hành.

Phương thức write() của giao diện WritableStreamDefaultWriter sẽ ghi một phần dữ liệu đã chuyển vào WritableStream và bồn lưu trữ dữ liệu cơ bản, sau đó trả về một lời hứa sẽ cho biết thao tác ghi có thành công hay không. Xin lưu ý rằng "thành công" phụ thuộc vào bồn lưu trữ dữ liệu cơ bản; nó có thể cho biết phân đoạn đã được chấp nhận và không nhất thiết là phân đoạn đã được lưu an toàn vào đích đến cuối cùng.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Thuộc tính locked

Bạn có thể kiểm tra xem một luồng có thể ghi có bị khoá hay không bằng cách truy cập vào thuộc tính WritableStream.locked của luồng đó.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mẫu mã luồng có thể ghi

Mã mẫu dưới đây cho thấy tất cả các bước trong thực tế.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Truyền một luồng có thể đọc được vào một luồng có thể ghi

Một luồng có thể đọc được có thể được chuyển đến một luồng có thể ghi thông qua phương thức pipeTo() của luồng có thể đọc được. ReadableStream.pipeTo() chuyển ReadableStream hiện tại đến một WritableStream nhất định và trả về một lời hứa sẽ thực hiện được khi quy trình đường ống hoàn tất thành công hoặc từ chối nếu có lỗi.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tạo luồng biến đổi

Giao diện TransformStream của API Luồng đại diện cho một tập dữ liệu có thể chuyển đổi. Bạn tạo một luồng biến đổi bằng cách gọi hàm khởi tạo TransformStream(). Hàm này sẽ tạo và trả về một đối tượng luồng biến đổi từ các trình xử lý nhất định. Hàm khởi tạo TransformStream() chấp nhận đối số đầu tiên là một đối tượng JavaScript không bắt buộc đại diện cho transformer. Các đối tượng như vậy có thể chứa bất kỳ phương thức nào sau đây:

transformer

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Thông thường, thuộc tính này dùng để thêm các phần tiền tố vào hàng đợi bằng cách sử dụng controller.enqueue(). Các phân đoạn đó sẽ được đọc từ phía có thể đọc được nhưng không phụ thuộc vào bất kỳ hoạt động ghi nào ở phía có thể ghi. Nếu quy trình ban đầu này không đồng bộ, chẳng hạn như do mất một chút công sức để có được các phần tiền tố, thì hàm có thể trả về một lời hứa để cho biết thành công hoặc không thành công; một lời hứa bị từ chối sẽ gây ra lỗi cho luồng. Mọi ngoại lệ được gửi sẽ được hàm khởi tạo TransformStream() gửi lại.
  • transform(chunk, controller): Phương thức này được gọi khi một phân đoạn mới ban đầu được ghi vào phía có thể ghi đã sẵn sàng để chuyển đổi. Việc triển khai luồng đảm bảo rằng hàm này sẽ chỉ được gọi sau khi các phép biến đổi trước đó đã thành công và không bao giờ trước khi start() hoàn tất hoặc sau khi flush() được gọi. Hàm này thực hiện công việc biến đổi thực tế của luồng biến đổi. Công cụ này có thể thêm kết quả vào hàng đợi bằng controller.enqueue(). Việc này cho phép một đoạn duy nhất được ghi ở phía có thể ghi để dẫn đến kết quả là không có hoặc có nhiều đoạn ở phía có thể đọc được, tuỳ thuộc vào số lần controller.enqueue() được gọi. Nếu quá trình chuyển đổi không đồng bộ, hàm này có thể trả về một hứa hẹn cho biết rằng quá trình chuyển đổi có thành công hay không. Lời hứa bị từ chối sẽ lỗi cả bên có thể đọc và có thể ghi của luồng biến đổi. Nếu không cung cấp phương thức transform(), thì phép biến đổi danh tính sẽ được dùng để xếp các đoạn không thay đổi so với bên có thể ghi sang bên có thể đọc được.
  • flush(controller): Phương thức này được gọi sau khi tất cả các đoạn được ghi cho bên có thể ghi đã được chuyển đổi bằng cách truyền thành công qua transform() và bên có thể ghi sắp được đóng. Thông thường, thuộc tính này được dùng để thêm các phân đoạn hậu tố vào hàng đợi phía có thể đọc được, trước khi bị đóng. Nếu quá trình xả dữ liệu không đồng bộ, hàm có thể trả về một tín hiệu hứa hẹn thành công hoặc không thành công; kết quả sẽ được thông báo đến phương thức gọi của stream.writable.write(). Ngoài ra, lời hứa bị từ chối sẽ lỗi cả bên có thể đọc và có thể ghi của luồng. Việc khai báo ngoại lệ được xử lý giống như trả về một lời hứa bị từ chối.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Chiến lược thêm vào hàng đợi writableStrategyreadableStrategy

Các tham số không bắt buộc thứ hai và thứ ba của hàm khởi tạo TransformStream() là các chiến lược xếp hàng writableStrategyreadableStrategy không bắt buộc. Chúng được xác định như đã nêu trong các phần luồng có thể đọccó thể ghi tương ứng.

Biến đổi mã mẫu trong luồng

Mã mẫu sau đây cho thấy một luồng biến đổi đơn giản trong thực tế.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Tạo một luồng có thể đọc được qua luồng biến đổi

Phương thức pipeThrough() của giao diện ReadableStream cung cấp một phương thức có thể tạo chuỗi cho luồng hiện tại thông qua luồng biến đổi hoặc bất kỳ cặp nào khác có thể ghi/có thể đọc được. Thông thường, việc tạo luồng sẽ khoá luồng đó trong suốt thời gian hoạt động của đường ống, ngăn các trình đọc khác khoá luồng đó.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Mã mẫu tiếp theo (được thiết kế một chút) cho thấy cách bạn có thể triển khai phiên bản "hét" của fetch(). Phiên bản này viết hoa tất cả văn bản bằng cách sử dụng lời hứa phản hồi được trả về dưới dạng một luồng và viết hoa theo từng phần. Ưu điểm của phương pháp này là bạn không cần phải đợi toàn bộ tài liệu được tải xuống, điều này có thể tạo ra sự khác biệt lớn khi xử lý các tệp lớn.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Bản minh hoạ

Bản minh hoạ dưới đây cho thấy các luồng có thể đọc, có thể ghi và biến đổi trong thực tế. Tài liệu này cũng bao gồm các ví dụ về chuỗi ống pipeThrough()pipeTo(), đồng thời minh hoạ tee(). Bạn có thể tuỳ ý chạy bản minh hoạ trong cửa sổ riêng hoặc xem mã nguồn.

Các luồng hữu ích có trong trình duyệt

Có một số luồng hữu ích được tích hợp ngay trong trình duyệt. Bạn có thể dễ dàng tạo ReadableStream từ một blob. Phương thức stream() của giao diện Blob sẽ trả về một ReadableStream. Khi đọc, phương thức này sẽ trả về dữ liệu có trong blob. Ngoài ra, hãy nhớ rằng đối tượng File là một loại cụ thể của Blob và có thể được dùng trong bất kỳ ngữ cảnh nào mà blob có thể dùng.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Các biến thể truyền trực tuyến của TextDecoder.decode()TextEncoder.encode() được gọi lần lượt là TextDecoderStreamTextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Bạn có thể dễ dàng nén hoặc giải nén một tệp bằng cách biến đổi luồng CompressionStreamDecompressionStream tương ứng. Mã mẫu bên dưới cho thấy cách bạn có thể tải thông số kỹ thuật của Luồng xuống, nén (gzip) ngay trong trình duyệt và ghi tệp nén trực tiếp vào ổ đĩa.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream của API Truy cập hệ thống tệpluồng yêu cầu fetch() thử nghiệm là ví dụ về các luồng có thể ghi trong tự nhiên.

API nối tiếp sử dụng nhiều cả luồng có thể đọc và có thể ghi.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Cuối cùng, API WebSocketStream tích hợp các luồng với API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Tài nguyên hữu ích

Xác nhận

Bài viết này được Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe MedleyAdam gạo đánh giá. Các bài đăng trên blog của Jake Archibald đã giúp tôi hiểu được rất nhiều về luồng. Một số mã mẫu được lấy cảm hứng từ các hoạt động khám phá của người dùng GitHub @bellbind và các phần trong văn bản được xây dựng chủ yếu trên Tài liệu web MMD trên Luồng. Tác giả của Streams Standard đã rất thành công trong việc viết quy cách này. Hình ảnh chính của Ryan Lara trên Unsplash.