מקורות נתונים – המדריך המלא

איך משתמשים ב-Streams API כדי לקרוא, לכתוב ולבצע טרנספורמציות של מקורות נתונים (streams)?

Streams API מאפשר לכם לגשת באופן פרוגרמטי למקורות נתונים שהתקבלו ברשת או נוצרו בכל אמצעי מקומי, ולעבד אותם באמצעות JavaScript. סטרימינג כולל פירוק של משאב שרוצים לקבל, לשלוח או לשנות למקטעים קטנים, ולאחר מכן עיבוד של המקטעים האלה ביט אחרי ביט. דפדפנים מבצעים סטרימינג בכל מקרה כשהם מקבלים נכסים כמו HTML או סרטונים שמוצגים בדפי אינטרנט, אבל היכולת הזו לא הייתה זמינה ל-JavaScript לפני שהתכונה fetch עם סטרימינג הושקה ב-2015.

בעבר, אם רצית לעבד משאב כלשהו (סרטון, קובץ טקסט וכו'), היה עליך להוריד את הקובץ כולו, להמתין עד שהוא יתבצע דה-סריאליזציה לפורמט מתאים ואז לעבד אותו. כשהסטרים זמינים ל-JavaScript, כל זה משתנה. עכשיו אפשר לעבד נתונים גולמיים באמצעות JavaScript באופן הדרגתי ברגע שהם זמינים אצל הלקוח, בלי צורך ליצור מאגר, מחרוזת או blob. כך תוכלו להשתמש במספר תרחישים לדוגמה, חלק מהם מפורטים בהמשך:

  • אפקטים בסרטון: העברה של זרם וידאו קריא דרך זרם טרנספורמציה שמחיל אפקטים בזמן אמת.
  • דחיסת (ופירוק) נתונים: העברת קובץ באמצעות צינור (pipe) דרך מקור נתונים שמבצע דחיסת (ופירוק) נתונים באופן סלקטיבי.
  • פענוח תמונות: העברת מקור נתונים של תגובת HTTP דרך מקור נתונים של טרנספורמציה שמפענח בייטים לנתוני ביומטריק, ולאחר מכן דרך מקור נתונים נוסף של טרנספורמציה שמתרגם ביומטריק ל-PNG. אם מתקינים את הקוד בתוך הטיפולן fetch של עובד שירות, אפשר להשתמש בו כדי להוסיף polyfill לשקיפות לפורמטים חדשים של תמונות, כמו AVIF.

תמיכה בדפדפנים

ReadableStream ו-WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

מושגי ליבה

לפני שנכנס לפרטים על הסוגים השונים של שידורים, אסביר על כמה מושגים מרכזיים.

קטעים

מקטע הוא יחידה אחת של נתונים שנכתבת בזרם או נקראת ממנו. הוא יכול להיות מכל סוג, והזרמים יכולים לכלול גם קטעים מסוגים שונים. ברוב המקרים, מקטע לא יהיה יחידת הנתונים האטומית ביותר במקור נתונים נתון. לדוגמה, מקור נתונים של בייטים עשוי להכיל קטעים שמכילים 16 יחידות Uint8Array בגודל KiB, במקום בייטים בודדים.

מקורות נתונים שאפשר לקרוא

מקור נתונים שאפשר לקרוא אותו מייצג מקור נתונים שאפשר לקרוא ממנו. במילים אחרות, הנתונים יוצאים ממקור נתונים שאפשר לקרוא. באופן ספציפי, מקור נתונים לקריאה הוא מופע של הכיתה ReadableStream.

מקורות נתונים שאפשר לכתוב בהם

מקור נתונים לכתיבה מייצג יעד לנתונים שאפשר לכתוב אליו. במילים אחרות, הנתונים נכנסים למקור נתונים שאפשר לכתוב בו. באופן ספציפי, מקור נתונים לכתיבה הוא מופע של הכיתה WritableStream.

טרנספורמציה של מקורות נתונים

זרם טרנספורמציה מורכב מזוג של זרמים: זרם לכתיבה, שנקרא הצד לכתיבה, וזרם לקריאה, שנקרא הצד לקריאה. מטאפורה לכך בעולם האמיתי היא מתרגם סימולטני שמתרגם משפה אחת לשפה אחרת בזמן אמת. באופן ספציפי למקור הנתונים של הטרנספורמציה, כשמעתיקים לצד שאפשר לכתוב בו, הנתונים החדשים הופכים לזמינים לקריאה מהצד שאפשר לקרוא בו. באופן ספציפי, כל אובייקט עם נכס writable ונכס readable יכול לשמש כזרם טרנספורמציה. עם זאת, בעזרת הכיתה הרגילה TransformStream קל יותר ליצור זוג כזה שמקושר בצורה נכונה.

שרשראות צינורות

השימוש העיקרי בזרמים הוא העברה (piping) שלהם זה לזה. אפשר להעביר זרם לקריאה ישירות לזרם לכתיבה באמצעות השיטה pipeTo() של הזרם לקריאה, או להעביר אותו דרך זרם טרנספורמציה אחד או יותר באמצעות השיטה pipeTo() של הזרם לקריאה.pipeThrough() קבוצה של מקורות נתונים שמחוברים באמצעות צינור בדרך הזו נקראת שרשרת צינורות.

לחץ חזרה

אחרי שיוצרים שרשרת צינורות, היא מפיצה אותות לגבי המהירות שבה קטעי הקוד צריכים לעבור בה. אם שלב כלשהו בשרשרת עדיין לא יכול לקבל קטעי קוד, הוא מעביר אות לאחור דרך שרשרת הצינור, עד שבסופו של דבר המקור המקורי מקבל הודעה להפסיק לייצר קטעי קוד במהירות כזו. התהליך הזה של נורמליזציה של התנועה נקרא לחץ חזרה.

הנחיתה על המגרש

אפשר להשתמש בשיטה tee() כדי ליצור יציאה מרכזית (נקראת כך בגלל הצורה שלה, 'T' גדול) לשידור קריא. הפעולה הזו תנעיל את הסטרימינג, כלומר לא תהיה יותר אפשרות להשתמש בו ישירות. עם זאת, היא תיצור שני סטרימינגים חדשים, שנקראים ענפים, שאפשר לצרוך אותם בנפרד. חשוב גם להתחיל את השידור בזמן הנכון כי אי אפשר להריץ אותו לאחור או להתחיל אותו מחדש. נרחיב על כך בהמשך.

תרשים של שרשרת צינור שמכילה שידור קריא שמגיע מבקשת קריאה ל-API, ולאחר מכן מועבר דרך שידור טרנספורמציה שהפלט שלו מתפצל לשני זרמים קריאים: הראשון נשלח לדפדפן והשני נשלח למטמון של ה-service worker.
שרשרת צינורות.

המנגנון של מקור נתונים קריא

מקור נתונים שאפשר לקרוא אותו הוא מקור נתונים שמיוצג ב-JavaScript באמצעות אובייקט ReadableStream שזורם ממקור בסיסי. המאגר ReadableStream() יוצר אובייקט של מקור נתונים שניתן לקריאה מהמפעילים הנתונים, ומחזיר אותו. יש שני סוגים של מקורות בסיסיים:

  • מקורות דחיפה שולחים נתונים כל הזמן אחרי שמקבלים אליהם גישה, ועליך להתחיל, להשהות או לבטל את הגישה לשידור. דוגמאות לכך הן שידורי וידאו חיים, אירועים שנשלחים מהשרת או WebSockets.
  • במקורות משיכה צריך לבקש מהם נתונים באופן מפורש אחרי שמתחברים אליהם. דוגמאות לכך הן פעולות HTTP באמצעות קריאות fetch() או XMLHttpRequest.

נתוני מקור הנתונים נקרא ברצף בחלקים קטנים שנקראים קטעים. הקטעים שמתווספים לסטרימינג נקראים קטעים בתור. כלומר, הם נמצאים בתור ומוכנים לקריאה. תור פנימי עוקב אחרי הקטעים שעדיין לא נקראו.

שיטת תורים היא אובייקט שמגדיר איך הסטרימינג צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. אסטרטגיית ההמתנה בתור מקצה גודל לכל מקטע, ומשווים את הגודל הכולל של כל המקטעים בתור למספר מסוים שנקרא נקודת הפסגה.

קורא קורא את הקטעים בתוך הסטרימינג. הקוראים האלה מאחזרים את הנתונים בחלקים, ומאפשרים לבצע כל פעולה שרוצים עליהם. הקורא יחד עם קוד העיבוד האחר שמצורף אליו נקרא צרכן.

המבנה הבא בהקשר הזה נקרא בקר. לכל מקור נתונים שאפשר לקרוא ממנו יש רכיב בקרה שמאפשר, כפי ששמו מרמז, לשלוט במקור הנתונים.

רק קורא אחד יכול לקרוא מקור נתונים בו-זמנית. כשקורא נוצר ומתחיל לקרוא מקור נתונים (כלומר הופך לקורא פעיל), הוא מונע ממנו. אם רוצים שקורא אחר ימשיך לקרוא את השידור, בדרך כלל צריך לשחרר את הקורא הראשון לפני שמבצעים פעולה אחרת (אבל אפשר גם לחלק את השידור).

יצירת מקור נתונים שאפשר לקרוא אותו

כדי ליצור מקור נתונים לקריאה, צריך להפעיל את ה-constructor שלו, ReadableStream(). למבנה ה-constructor יש ארגומנט אופציונלי underlyingSource, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את האופן שבו מופע הסטרימינג שנוצר יתנהג.

underlyingSource

אפשר להשתמש בשיטות האופציונליות הבאות שהוגדרו על ידי המפתחים:

  • start(controller): הקריאה מתבצעת מיד כשהאובייקט נוצר. השיטה יכולה לגשת למקור הסטרימינג ולבצע כל פעולה אחרת שנדרשת להגדרת הפונקציונליות של הסטרימינג. אם התהליך הזה צריך להתבצע באופן אסינכרוני, השיטה יכולה להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. הפרמטר controller שמוענק לשיטה הזו הוא ReadableStreamDefaultController.
  • pull(controller): אפשר להשתמש בה כדי לשלוט בשידור בזמן אחזור קטעים נוספים. הוא נקרא שוב ושוב כל עוד הקטעים שבתור הפנימי של הסטרימינג לא מלאים, עד שהתור מגיע לנקודת השיא שלו. אם התוצאה של קריאה ל-pull() היא הבטחה, לא תתבצע קריאה חוזרת ל-pull() עד שההבטחה תתבצע. אם הבטחה תידחה, ההעברה תהיה עם שגיאה.
  • cancel(reason): הקריאה הזו מתבצעת כשצרכן הסטרימינג מבטל את הסטרימינג.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ה-ReadableStreamDefaultController תומך בשיטות הבאות:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

הארגומנט השני של המבנה ReadableStream(), שגם הוא אופציונלי, הוא queuingStrategy. זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם שני פרמטרים:

  • highWaterMark: מספר לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל המוגבל והחיובי של ערך הקטע הנתון. התוצאה משמשת לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאים ReadableStreamDefaultController.desiredSize. הוא קובע גם מתי מתבצעת הקריאה ל-method‏ pull() של המקור הבסיסי.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

השיטות getReader() ו-read()

כדי לקרוא מזרם שאפשר לקרוא, צריך קורא, שהוא ReadableStreamDefaultReader. השיטה getReader() של הממשק ReadableStream יוצרת קורא ונועלת את הסטרימינג אליו. בזמן שהשידור נעול, לא ניתן לקבל קורא אחר עד שהקורא הזה ישוחרר.

השיטה read() של הממשק ReadableStreamDefaultReader מחזירה הבטחה (promise) שמספקת גישה לקטע הבא בתור הפנימי של הסטרימינג. הוא ממלא או דוחה את הבקשה עם תוצאה, בהתאם למצב של המקור. האפשרויות השונות הן:

  • אם יש מקטע זמין, ההתחייבות תתמלא באובייקט מהצורה
    { value: chunk, done: false }.
  • אם הסטרימינג יסתיים, ההתחייבות תתמלא באובייקט מהפורמט
    { value: undefined, done: true }.
  • אם הזרם יהיה שגוי, ההבטחה תידחה עם השגיאה הרלוונטית.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

הנכס locked

כדי לבדוק אם מקור נתונים שאפשר לקרוא אותו נעול, אפשר לגשת למאפיין ReadableStream.locked שלו.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמאות לקוד של שידורים שאפשר לקרוא

בדוגמת הקוד שבהמשך מוצגים כל השלבים בפעולה. קודם יוצרים ReadableStream שמגדיר את השיטה start() בארגומנט underlyingSource שלו (כלומר, בכיתה TimestampSource). השיטה הזו מורה ל-controller של הסטרימינג להוסיף enqueue() לחותמת זמן בכל שנייה במשך עשר שניות. לבסוף, הוא מצווה על הבקר close() את הסטרימינג. כדי לצרוך את המקור הזה, יוצרים קורא באמצעות ה-method‏ getReader() ומפעילים את read() עד שהמקור הוא done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

איטרציה אסינכרונית

בדיקה בכל מחזור של הלולאה read() אם הסטרימינג הוא done עשויה להיות לא ה-API הנוח ביותר. למרבה המזל, בקרוב תהיה דרך טובה יותר לעשות זאת: איטרציה אסינכרונית.

for await (const chunk of stream) {
  console.log(chunk);
}

פתרון עקיף לשימוש בחזרה אסינכררונית הוא הטמעת ההתנהגות באמצעות polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

יצירת זרם לקריאה

השיטה tee() של הממשק ReadableStream מחלקת את הסטרימינג הנוכחי שאפשר לקרוא, ומחזירה מערך של שני רכיבים שמכיל את שני ההסתעפויות שנוצרו כמכונות ReadableStream חדשות. כך שני קוראים יכולים לקרוא את אותו מקור בו-זמנית. אפשר לעשות זאת, לדוגמה, בקובץ שירות (service worker) אם רוצים לאחזר תשובה מהשרת ולהעביר אותה בסטרימינג לדפדפן, אבל גם להעביר אותה בסטרימינג למטמון של קובץ השירות. מכיוון שאי אפשר להשתמש בגוף התגובה יותר מפעם אחת, צריך שתי עותקים כדי לעשות זאת. כדי לבטל את המקור, צריך לבטל את שני ההסתעפויות שנוצרו. בדרך כלל, כשמתחילים לקרוא מקור נתונים, הוא נעול למשך כל זמן הקריאה, כך שקוראים אחרים לא יכולים לנעול אותו.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

מקורות נתונים של בייטים שניתנים לקריאה

לגבי מקורות נתונים שמייצגים בייטים, קיימת גרסה מורחבת של מקור הנתונים שאפשר לקרוא כדי לטפל בבייטים ביעילות, במיוחד על ידי צמצום מספר העותקים. באמצעות שידורי בייטים אפשר לקבל קוראים מסוג 'מביאים את המאגר שלכם' (BYOB). הטמעת ברירת המחדל יכולה לספק מגוון פלטים שונים, כמו מחרוזות או מאגרי מערכי ב-WebSockets, בעוד שזרמי בייטים מבטיחים פלט של בייטים. בנוסף, לקוראים של BYOB יש יתרונות יציבות. הסיבה לכך היא שאם מאגר מנותק, אפשר להבטיח שלא כותבים לאותו מאגר פעמיים, וכך להימנע ממצבים של מרוץ תהליכים. קוראים של BYOB יכולים לצמצם את מספר הפעמים שהדפדפן צריך להריץ את האיסוף של שטחי האחסון שאינם בשימוש, כי הוא יכול לעשות שימוש חוזר במאגרים.

יצירת מקור נתונים של בייטים לקריאה

כדי ליצור מקור נתונים של בייטים שאפשר לקרוא, מעבירים פרמטר type נוסף למבנה ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

המקור הבסיסי של מקור בייט קריא מקבל ReadableByteStreamController כדי לבצע בו מניפולציה. השיטה ReadableByteStreamController.enqueue() מקבלת ארגומנט chunk שהערך שלו הוא ArrayBufferView. הנכס ReadableByteStreamController.byobRequest מחזיר את בקשת ה-pull הנוכחית של BYOB, או null אם אין כזו. לבסוף, המאפיין ReadableByteStreamController.desiredSize מחזיר את הגודל הרצוי כדי למלא את התור הפנימי של הסטרימינג המבוקר.

queuingStrategy

הארגומנט השני של המבנה ReadableStream(), שגם הוא אופציונלי, הוא queuingStrategy. זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לשידור, עם פרמטר אחד:

  • highWaterMark: מספר בייטים לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת ההמתנה בתור הזו. המאפיין הזה משמש לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאים ReadableByteStreamController.desiredSize. הוא קובע גם מתי מתבצעת הקריאה ל-method‏ pull() של המקור הבסיסי.

השיטות getReader() ו-read()

לאחר מכן תוכלו לקבל גישה ל-ReadableStreamBYOBReader על ידי הגדרת הפרמטר mode בהתאם: ReadableStream.getReader({ mode: "byob" }). כך אפשר לשלוט בצורה מדויקת יותר בהקצאת המאגר כדי למנוע עותקים. כדי לקרוא מזרם הבייטים, צריך לבצע קריאה ל-ReadableStreamBYOBReader.read(view), כאשר view הוא ArrayBufferView.

דוגמת קוד של מחרוזת בייט לקריאה

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

הפונקציה הבאה מחזירה זרמי בייטים לקריאה שמאפשרים קריאה יעילה ללא העתקה של מערך שנוצר באופן אקראי. במקום להשתמש בגודל מקטע מוגדר מראש של 1,024, הוא מנסה למלא את מאגר הנתונים שסופק על ידי המפתח, ומאפשר שליטה מלאה.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

המנגנונים של מקור נתונים לכתיבה

מקור נתונים לכתיבה הוא יעד שבו אפשר לכתוב נתונים, שמיוצגים ב-JavaScript באמצעות אובייקט WritableStream. הוא משמש כחפץ מופשט מעל sink בסיסי – sink של קלט/פלט ברמה נמוכה יותר שאליו נכתבים נתונים גולמיים.

הנתונים נכתבים בסטרימינג באמצעות סופר, מקטע אחד בכל פעם. מקטע יכול להופיע במגוון צורות, בדיוק כמו המקטעים בקורא. אתם יכולים להשתמש בכל קוד שתרצו כדי ליצור את הקטעים שיהיו מוכנים לכתיבה. הקוד וגם הסופר נקראים מפיק.

כשיוצרים גורם כתיבה ומתחילים לכתוב בזרם (גורם כתיבה פעיל), הוא מונע ממנו. רק גורם אחד יכול לכתוב בסטרימינג שאפשר לכתוב בו בכל פעם. אם רוצים שגורם אחר יתחיל לכתוב בסטרימינג, בדרך כלל צריך לשחרר אותו ואז לצרף אליו גורם כתיבה אחר.

תור פנימי עוקב אחרי קטעי הנתונים שנכתבו בסטרימינג אבל עדיין לא עברו עיבוד על ידי ה-sink הבסיסי.

שיטת תורים היא אובייקט שמגדיר איך הסטרימינג צריך לסמן לחץ חוזר על סמך המצב של התור הפנימי שלו. אסטרטגיית ההמתנה בתור מקצה גודל לכל מקטע, ומשווים את הגודל הכולל של כל המקטעים בתור למספר מסוים שנקרא נקודת השיא.

המבנה הסופי נקרא בקר. לכל מקור נתונים שאפשר לכתוב בו יש אמצעי בקרה משויך שמאפשר לשלוט במקור הנתונים (לדוגמה, לבטל אותו).

יצירת מקור לכתיבה

הממשק WritableStream של Streams API מספק הפשטה סטנדרטית לכתיבה של נתוני סטרימינג ליעד, שנקרא sink. האובייקט הזה כולל לחץ חוזר (backpressure) ותור מובנים. כדי ליצור מקור נתונים שאפשר לכתוב בו, צריך להפעיל את ה-constructor שלו, WritableStream(). יש לו פרמטר underlyingSink אופציונלי, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את האופן שבו מופע הסטרימינג שנוצר יתנהג.

underlyingSink

ה-underlyingSink יכול לכלול את השיטות האופציונליות הבאות שהוגדרו על ידי המפתחים. הפרמטר controller שמוענק לחלק מהשיטות הוא WritableStreamDefaultController.

  • start(controller): המערכת קוראת לשיטה הזו מיד כשהאובייקט נוצר. התוכן של השיטה הזו צריך לכלול קוד שמטרתו לקבל גישה ל-sink הבסיסי. אם התהליך הזה צריך להתבצע באופן אסינכרוני, אפשר להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון.
  • write(chunk, controller): ה-method הזה ייקרא כשמקטע נתונים חדש (שצוין בפרמטר chunk) יהיה מוכן לכתיבה ב-sink הבסיסי. הוא יכול להחזיר הבטחה כדי לסמן את ההצלחה או הכישלון של פעולת הכתיבה. השיטה הזו תופעל רק אחרי שפעולות הכתיבה הקודמות הצליחו, אף פעם אחרי שהסטרימינג נסגר או בוטל.
  • close(controller): ה-method הזה ייקרא אם האפליקציה תאותת שהיא סיימה לכתוב קטעי קוד לסטרימינג. התוכן צריך לבצע את כל הפעולות הנדרשות כדי לסיים את הכתיבה לבור הנתונים הבסיסי ולשחרר את הגישה אליו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. השיטה הזו תיקרא רק אחרי שכל פעולות הכתיבה שנמצאות בתור יסתיימו בהצלחה.
  • abort(reason): ה-method הזה ייקרא אם האפליקציה תאותת שהיא רוצה לסגור את המקור באופן פתאומי ולהעביר אותו למצב שגיאה. הוא יכול לנקות משאבים מוחזקים, בדומה ל-close(), אבל abort() ייכלל גם אם יש ברשימה כתיבה. הקטעים האלה יימחקו. אם התהליך הזה הוא אסינכרוני, הוא יכול להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. הפרמטר reason מכיל את הערך DOMString שמתאר את הסיבה לביטול השידור.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

הממשק WritableStreamDefaultController של Streams API מייצג בקר שמאפשר לשלוט במצב של WritableStream במהלך ההגדרה, כשמעבירים עוד קטעים לכתיבה או בסיום הכתיבה. כשיוצרים WritableStream, למכשיר ההטמעה (sink) הבסיסי מוקצה מכונה תואמת של WritableStreamDefaultController לצורך מניפולציה. ל-WritableStreamDefaultController יש רק method אחד: WritableStreamDefaultController.error(), שגורם לשגיאות בכל אינטראקציה עתידית עם הסטרימינג המשויך. WritableStreamDefaultController תומך גם במאפיין signal שמחזיר מופע של AbortSignal, שמאפשר לעצור פעולת WritableStream במקרה הצורך.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

הארגומנט השני של המבנה WritableStream(), שגם הוא אופציונלי, הוא queuingStrategy. זהו אובייקט שמגדיר אופציונלית אסטרטגיית תורים לסטרימינג, עם שני פרמטרים:

  • highWaterMark: מספר לא שלילי שמציין את נקודת השיא של הסטרימינג באמצעות שיטת התור הזו.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל המוגבל והחיובי של ערך הקטע הנתון. התוצאה משמשת לקביעת לחץ החזרה, שמופיע דרך המאפיין המתאים WritableStreamDefaultWriter.desiredSize.

השיטות getWriter() ו-write()

כדי לכתוב בסטרימינג שאפשר לכתוב בו, צריך גורם כתיבה, שהוא WritableStreamDefaultWriter. השיטה getWriter() של הממשק WritableStream מחזירה מופע חדש של WritableStreamDefaultWriter ונועלת את הסטרימינג למופע הזה. בזמן שהסטרימינג נעול, לא ניתן לצרף סופר אחר עד שהסופר הנוכחי ישוחרר.

השיטה write() של הממשק WritableStreamDefaultWriter כותבת מקטע נתונים שהועברו ל-WritableStream ולצינור הניקוז הבסיסי שלו, ואז מחזירה הבטחה שמתקבלת כדי לציין את ההצלחה או הכישלון של פעולת הכתיבה. חשוב לזכור שהמשמעות של 'הצלחה' תלויה ב-sink הבסיסי. ייתכן שהמשמעות היא שהקטע אושר, ולא בהכרח שהוא נשמר בבטחה ביעד הסופי שלו.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

הנכס locked

כדי לבדוק אם מקור נתונים שאפשר לכתוב בו נעול, צריך לגשת למאפיין WritableStream.locked שלו.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמת קוד של מקור נתונים לכתיבה

בדוגמת הקוד שבהמשך מוצגים כל השלבים בפעולה.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

העברה של מקור נתונים לקריאה למקור נתונים לכתיבה

אפשר להעביר באמצעות צינור (pipe) מקור נתונים לקריאה למקור נתונים לכתיבה באמצעות השיטה pipeTo() של מקור הנתונים לקריאה. הפונקציה ReadableStream.pipeTo() מעבירה את ReadableStream הנוכחי באמצעות צינור ל-WritableStream נתון ומחזירה הבטחה שתתבצע כשתהליך ההעברה יושלם בהצלחה, או תידחה אם נתקלנו בשגיאות.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

יצירת מקור נתונים לטרנספורמציה

הממשק TransformStream של Streams API מייצג קבוצה של נתונים שניתנים לטרנספורמציה. כדי ליצור transform stream, קוראים למבנה ה-constructor שלו, TransformStream(), שיוצר אובייקט transform stream מהמפעילים שצוינו ומחזיר אותו. ה-constructor של TransformStream() מקבל כארגומנט הראשון אובייקט JavaScript אופציונלי שמייצג את ה-transformer. אובייקטים כאלה יכולים להכיל כל אחת מהשיטות הבאות:

transformer

  • start(controller): המערכת קוראת לשיטה הזו מיד כשהאובייקט נוצר. בדרך כלל משתמשים באפשרות הזו כדי להוסיף לתור קטעי קידומת באמצעות controller.enqueue(). הקטעים האלה יקראו מהצד הקריא, אבל הם לא תלויים בכתיבה בצד הניתן לכתיבה. אם התהליך הראשוני הוא אסינכרוני, למשל כי נדרשת קצת מאמץ כדי לקבל את קטעי הקידומת, הפונקציה יכולה להחזיר הבטחה (promise) כדי לסמן הצלחה או כישלון. הבטחה שנדחתה תגרום לשגיאה בסטרימינג. כל חריגות שיושגו יושלחו מחדש על ידי ה-constructor של TransformStream().
  • transform(chunk, controller): ה-method הזה נקרא כשמקטע חדש שנכתב במקור בצד הכתיבה מוכן לטרנספורמציה. הטמעת הסטרימינג מבטיחה שהפונקציה הזו תופעל רק אחרי שהטרנספורמציות הקודמות הצליחו, אף פעם לא לפני שהטרנספורמציה start() הושלמה או אחרי שהפונקציה flush() הוזמנה. הפונקציה הזו מבצעת את פעולת הטרנספורמציה בפועל של מקור הנתונים המומר. הוא יכול להוסיף את התוצאות לתור באמצעות controller.enqueue(). כך, מקטע אחד שנכתב בצד הכתיבה יכול להוביל לאפס או למספר מקטעים בצד הקריאה, בהתאם למספר הפעמים שבהן controller.enqueue() נקרא. אם התהליך של הטרנספורמציה הוא אסינכרוני, הפונקציה הזו יכולה להחזיר הבטחה (promise) כדי לסמן את ההצלחה או הכישלון של הטרנספורמציה. אם הבטחה תידחה, תופיע שגיאה גם בצד הקריאה וגם בצד הכתיבה של מקור הנתונים לטרנספורמציה. אם לא מציינים שיטת transform(), נעשה שימוש בטרנספורמציית הזהות, שמוסיפה לתור קטעי נתונים ללא שינוי מהצד שאפשר לכתוב בו לצד שאפשר לקרוא בו.
  • flush(controller): ה-method הזה נקרא אחרי שכל הקטעים שנכתבו בצד הכתיבה עברו טרנספורמציה בהצלחה דרך transform(), והצד הכתיבה עומד להיסגר. בדרך כלל משתמשים באפשרות הזו כדי להוסיף לתור קטעי סיומת בצד הקריאה, לפני שהוא נסגר גם כן. אם תהליך השטיפה הוא אסינכרוני, הפונקציה יכולה להחזיר הבטחה כדי לסמן הצלחה או כישלון. התוצאה תימסר למבצע הקריאה של stream.writable.write(). בנוסף, אם הבטחה תידחה, תופיע שגיאה גם בצד הקריאה וגם בצד הכתיבה של הסטרימינג. השלכת חריגה נחשבת לאותה פעולה כמו החזרת הבטחה שנדחתה.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

שיטות ההמתנה בתור writableStrategy ו-readableStrategy

הפרמטרים האופציונליים השני והשלישי של ה-constructor של TransformStream() הם אסטרטגיות אופציונליות של writableStrategy ו-readableStrategy לתור. הם מוגדרים כפי שמתואר בקטעים של זרם הקריאה (readable) ושל זרם הכתיבה (writable) בהתאמה.

דוגמת קוד לטרנספורמציה של מקור נתונים

דוגמת הקוד הבאה מראה זרם פשוט של טרנספורמציה בפעולה.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

העברה של מקור נתונים לקריאה דרך מקור נתונים לטרנספורמציה

השיטה pipeThrough() של ממשק ReadableStream מספקת דרך לשרשור צינור (pipe) של המקור הנוכחי דרך מקור טרנספורמציה או כל צמד אחר שאפשר לכתוב/לקרוא בו. בדרך כלל, העברת מקור נתונים בצינור נעילה אותו למשך כל תקופת ההעברה, ומונעת מקוראים אחרים לנעול אותו.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

בקטע הקוד הבא (שהוא קצת מלאכותי) מוצג איך אפשר להטמיע גרסה 'רועשת' של fetch() שממירה את כל הטקסט לאותיות רישיות על ידי שימוש בהבטחה של התגובה שהוחזרה כזרם והמרת כל מקטע לאותיות רישיות. היתרון של הגישה הזו הוא שאין צורך להמתין להורדה של המסמך כולו, וזה יכול להוות הבדל משמעותי כשעובדים עם קבצים גדולים.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

הדגמה (דמו)

הדגמה הבאה מראה את הפעולות של קריאה, כתיבה והמרה של זרמים. הוא כולל גם דוגמאות למחרוזות צינורות של pipeThrough() ו-pipeTo(), וגם הדגמה של tee(). אפשר להריץ את הדמו בחלון נפרד או להציג את קוד המקור.

שידורים מועילים שזמינים בדפדפן

יש מספר מקורות מידע שימושיים שמובנים ישירות בדפדפן. אפשר ליצור בקלות ReadableStream מ-blob. שיטת stream()‎ של הממשק Blob מחזירה ReadableStream, שמחזיר את הנתונים שמכיל ה-blob בזמן הקריאה. חשוב לזכור גם שאובייקט File הוא סוג ספציפי של Blob, ואפשר להשתמש בו בכל הקשר שבו אפשר להשתמש ב-blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

הווריאנטים של TextDecoder.decode() ו-TextEncoder.encode() בסטרימינג נקראים TextDecoderStream ו-TextEncoderStream, בהתאמה.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

קל לדחוס או לבטל דחיסה של קובץ באמצעות המרות של CompressionStream ו-DecompressionStream, בהתאמה. בדוגמת הקוד שבהמשך מוסבר איך מורידים את המפרט של Streams, דוחסים אותו (gzip) ישירות בדפדפן וכותבים את הקובץ הדחוס ישירות בדיסק.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream של File System Access API וfetch() request streams הניסיוניים הם דוגמאות לזרמים שאפשר לכתוב בהם בעולם האמיתי.

ב-Serial API נעשה שימוש נרחב בזרמים שאפשר לקרוא מהם וגם לכתוב אליהם.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

לבסוף, ה-API של WebSocketStream משלב בין סטרימינג ל-WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

משאבים שימושיים

תודות

הבדיקה של המאמר בוצעה על ידי Jake Archibald,‏ François Beaufort,‏ Sam Dutton,‏ Mattias Buelens,‏ Surma,‏ Joe Medley ו-Adam Rice. פוסטים בבלוג של Jake Archibald עזרו לי מאוד להבין את הנושא של שידורים. חלק מדגימות הקוד מבוססות על החקירות של משתמש GitHub‏ ‎@bellbind, וחלק מהטקסט מבוסס במידה רבה על מסמכי התיעוד של MDN Web ב-Streams. המחברים של Streams Standard עשו עבודה נהדרת בכתיבת המפרט הזה. התמונה הראשית היא של Ryan Lara ב-Unsplash.