أحداث البث: الدليل النهائي

تعرَّف على كيفية استخدام مصادر البيانات القابلة للقراءة والكتابة والتحويل باستخدام واجهة برمجة التطبيقات Streams API.

تتيح لك واجهة برمجة التطبيقات Streams API الوصول آليًا إلى مصادر البيانات التي يتم تلقّيها عبر الشبكة أو التي يتم إنشاؤها بأي وسيلة على الجهاز، ثم معالجتها باستخدام JavaScript. يتضمن البث تقسيم مورد تريد تلقّيه أو إرساله أو تحويله إلى أجزاء صغيرة، ثم معالجة هذه الأجزاء تدريجيًا. على الرغم من أنّ البث المباشر هو ميزة توفّرها المتصفّحات بشكلٍ تلقائي عند تلقّي مواد عرض مثل HTML أو الفيديوهات لعرضها على صفحات الويب، لم تكن هذه الميزة متاحة مطلقًا في JavaScript قبل طرح fetch مع أحداث البث المباشر في عام 2015.

في السابق، إذا أردت معالجة مورد من نوع ما (سواء كان فيديو أو ملفًا نصيًا أو غير ذلك)، كان عليك تنزيل الملف بالكامل والانتظار إلى أن يتم تحويله إلى تنسيق مناسب، ثم معالجته. مع توفّر أحداث البث لسمة JavaScript، سيتغيّر كل ذلك. يمكنك الآن معالجة البيانات الأولية باستخدام JavaScript بشكل تدريجي بمجرد توفّرها على العميل، بدون الحاجة إلى إنشاء ذاكرة تخزين مؤقت أو سلسلة أو ملف نصي. يتيح ذلك عددًا من حالات الاستخدام، وسنذكر بعضًا منها أدناه:

  • تأثيرات الفيديو: يتم توجيه بث فيديو قابل للقراءة من خلال بث تحويل يطبّق تأثيرات في الوقت الفعلي.
  • (فك) ضغط البيانات: يتم توجيه مصدر ملفات من خلال مصدر تحويل (فك) يضغطه بشكل انتقائي.
  • فك ترميز الصور: يتم توجيه مصدر استجابة HTTP من خلال مصدر تحويل يفك ترميز البايتات ويحوّلها إلى بيانات مخطّط بياني، ثم من خلال مصدر تحويل آخر يحوّل المخطّطات البيانية إلى ملفات بتنسيق PNG. في حال التثبيت داخل معالِج fetch لأحد موظّفي الخدمة، يتيح لك ذلك استخدام polyfill بشفافية لتنسيقات الصور الجديدة، مثل AVIF.

دعم المتصفح

ReadableStream وWritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

المفاهيم الأساسية

قبل الخوض في تفاصيل الأنواع المختلفة من أحداث البث، سأقدّم لك بعض المفاهيم الأساسية.

الأجزاء

الجزء هو قطعة واحدة من البيانات يتم كتابتها في مصدر بيانات أو قراءتها منه. يمكن أن يكون من أي نوع، ويمكن أن تحتوي أحداث البث على أجزاء من أنواع مختلفة. في معظم الأحيان، لن تكون المجموعة هي وحدة البيانات الأكثر أساسية لمصدر بيانات معيّن. على سبيل المثال، قد يحتوي بث البايتات على أجزاء تتألف من 16 وحدة Uint8Array كيلوبايت، بدلاً من البايتات الفردية.

أحداث البث القابلة للقراءة

يمثّل مصدر البيانات القابل للقراءة مصدر بيانات يمكنك القراءة منه. بعبارة أخرى، تأتي البيانات من مصدر بيانات مقروء. على وجه التحديد، البث القابل للقراءة هو مثيل لفئة ReadableStream.

أحداث البث القابلة للكتابة

يمثّل مصدر البيانات القابل للكتابة وجهة للبيانات التي يمكنك الكتابة فيها. بعبارة أخرى، يتم إدخال البيانات في مصدر قابل للكتابة. على وجه التحديد، فإنّ البث القابل للكتابة هو مثيل لفئة WritableStream.

تحويل أحداث البث

يتكوّن مسار التحويل من زوج من المسارات: مسار قابل للكتابة يُعرف باسم "الجانب القابل للكتابة"، ومسار قابل للقراءة يُعرف باسم "الجانب القابل للقراءة". يمكن تشبيه هذه الميزة بأحد المترجمين الفوريين الذين يترجمون من لغة إلى أخرى أثناء التحدث. بطريقة خاصة بمصدر التحويل، تؤدّي الكتابة إلى الجانب القابل للكتابة إلى إتاحة بيانات جديدة للقراءة من الجانب القابل للقراءة. على وجه التحديد، يمكن لأي عنصر يتضمّن سمة writable وسمة readable أن يُستخدَم كمجموعة بث تحويل. ومع ذلك، تسهّل فئة TransformStream العادية إنشاء مثل هذا الزوج المتشابك بشكل صحيح.

سلاسل الأنابيب

يتم استخدام أحداث البث بشكل أساسي من خلال توجيهها إلى بعضها. يمكن توجيه بث قابل للقراءة مباشرةً إلى بث قابل للكتابة، وذلك باستخدام طريقة pipeTo() للبث القابل للقراءة، أو يمكن توجيهه من خلال بث واحد أو أكثر من عمليات التحويل أولاً، وذلك باستخدام طريقة pipeThrough() للبث القابل للقراءة. يُشار إلى مجموعة من البثّات التي تم ربطها معًا بهذه الطريقة باسم سلسلة البث.

الضغط الخلفي

بعد إنشاء سلسلة قنوات، ستنشر إشارات بشأن سرعة تدفق الأجزاء من خلالها. إذا لم تتمكّن أي خطوة في السلسلة من قبول أجزاء بعد، يتم نشر إشارة للخلف من خلال سلسلة الأنابيب، إلى أن يتم في النهاية إبلاغ المصدر الأصلي بإيقاف إنتاج الأجزاء بهذه السرعة. تُعرف عملية تطبيع التدفق باسم الضغط الخلفي.

Teeing

يمكن بدء بث قابل للقراءة (يُعرف باسم "T" الكبير) باستخدام طريقة tee(). سيؤدي ذلك إلى قفل البث، أي لن يعود قابلاً للاستخدام مباشرةً، ولكن سيتم إنشاء سلسلتَي بث جديدتَين، تُعرفان باسم الفروع، ويمكن استخدامهما بشكل مستقل. من المهم أيضًا ضبط البث مسبقًا لأنّه لا يمكن ترجيع البث أو إعادة تشغيله، وسنوضّح لك المزيد من المعلومات حول هذا الموضوع لاحقًا.

مخطّط بياني لسلسلة قنوات يتألف من بث قابل للقراءة ناتج عن طلب إلى واجهة برمجة التطبيقات fetch API، ويتم توجيهه بعد ذلك من خلال بث تحويل يتم تقسيم مخرجاته ثم إرسالها إلى المتصفّح للبث القابل للقراءة الأول الناتج وإلى ذاكرة التخزين المؤقت لخدمة worker للبث القابل للقراءة الثاني الناتج.
سلسلة أنابيب.

آلية البث القابل للقراءة

مصدر البيانات القابل للقراءة هو مصدر بيانات يتم تمثيله في JavaScript من خلال عنصر ReadableStream الذي يتدفق من مصدر أساسي. تنشئ الدالة الإنشائية ReadableStream() كائن بث قابل للقراءة من معالِجات معيّنة وتُرجعه. هناك نوعان من المصادر الأساسية:

  • تُرسِل مصادر الإرسال البيانات إليك باستمرار عند الوصول إليها، ويكون بإمكانك بدء الوصول إلى البث أو إيقافه مؤقتًا أو إلغاؤه. وتشمل الأمثلة أحداث البث المباشر للفيديو أو الأحداث المُرسَلة من الخادم أو WebSockets.
  • تتطلّب مصادر السحب منك طلب البيانات منها صراحةً بعد الربط بها. وتشمل الأمثلة عمليات HTTP من خلال طلبات fetch() أو XMLHttpRequest.

تتم قراءة بيانات البث تسلسليًا في أجزاء صغيرة تُعرَف باسم المقاطع. يُقال إنّ الأجزاء التي يتم وضعها في بثّ مُدرَجة في قائمة الانتظار. وهذا يعني أنّه في انتظار المراجعة. تتتبّع القائمة الداخلية الأجزاء التي لم تتم قراءتها بعد.

استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدِّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.

يقرأ قارئ الأجزاء داخل البث. يسترجع هذا القارئ البيانات في قطعة واحدة في كل مرة، ما يتيح لك إجراء أي نوع من العمليات التي تريد إجراؤها عليها. يُطلق على القارئ بالإضافة إلى رمز المعالجة الآخر المصاحب له اسم مستهلك.

يُطلق على البنية التالية في هذا السياق اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للقراءة على عنصر تحكّم مرتبط به، كما يوحي الاسم، يتيح لك التحكّم في مصدر البيانات.

يمكن لقارئ واحد فقط قراءة مصدر بيانات في كل مرة. عند إنشاء قارئ وبدء قراءة مصدر بيانات، (أي أنّه يصبح قارئًا نشطًا)، يتم قفله عليه. إذا أردت أن يقرأ قارئ آخر البث، عليك عادةً إلغاء قراءة القارئ الأول قبل تنفيذ أي إجراء آخر (على الرغم من أنّه يمكنك تقسيم أحداث البث).

إنشاء بث قابل للقراءة

يمكنك إنشاء مصدر بيانات قابل للقراءة من خلال استدعاء عنصر الإنشاء ReadableStream(). يحتوي المُنشئ على وسيطة اختيارية underlyingSource، والتي تمثّل كائنًا يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSource

ويمكن أن يستخدِم هذا الإجراء الطرق الاختيارية التالية التي يحدّدها المطوّر:

  • start(controller): يتمّ استدعاؤه على الفور عند إنشاء العنصر. يمكن للطريقة الوصول إلى مصدر البث وتنفيذ أي إجراء آخر مطلوب لإعداد وظيفة البث. إذا كان من المفترض تنفيذ هذه العملية بشكل غير متزامن، يمكن للطريقة عرض وعد للإشارة إلى النجاح أو الفشل. المَعلمة controller التي تم تمريرها إلى هذه الطريقة هي a ReadableStreamDefaultController.
  • pull(controller): يمكن استخدامها للتحكّم في البث أثناء جلب المزيد من الأجزاء. ويتم استدعاؤه مراراً وتكراراً ما دامت قائمة الانتظار الداخلية للبث غير ممتلئة، إلى أن تصل قائمة الانتظار إلى الحد الأقصى. إذا كانت نتيجة استدعاء pull() هي وعد، لن يتم استدعاء pull() مرة أخرى إلى أن يتم الوفاء بهذا الوعد. إذا تم رفض الوعد، سيظهر خطأ في البث.
  • cancel(reason): يتمّ استدعاؤه عندما يلغي مستخدِم البث البث.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

تتيح ReadableStreamDefaultController الأساليب التالية:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لصانع ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.
  • size(chunk): دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. تُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمة ReadableStreamDefaultController.desiredSize المناسبة. ويحدِّد أيضًا الحالات التي يتم فيها استدعاء طريقة pull() للمصدر الأساسي.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

الطريقتان getReader() وread()

للقراءة من مصدر قابل للقراءة، تحتاج إلى قارئ، وهو ReadableStreamDefaultReader. تُنشئ طريقة getReader() لواجهة ReadableStream قارئًا وتُقفل البث عليه. عندما يكون البث مقفلًا، لا يمكن الحصول على قارئ آخر إلى أن يتم تحرير هذا القارئ.

تُرجِع طريقة read() لواجهة ReadableStreamDefaultReader وعدًا يسمح بالوصول إلى القطعة التالية في "قائمة المحتوى التالي" الداخلية للبث. يتم استيفاء هذه الشروط أو رفضها بنتيجة استنادًا إلى حالة البث. في ما يلي الاحتمالات المختلفة:

  • إذا كان هناك قطعة متاحة، سيتم تنفيذ الوعد بعنصر من الشكل
    { value: chunk, done: false }.
  • إذا أصبح البث مغلقًا، سيتم تنفيذ الوعد باستخدام عنصر من النوع
    { value: undefined, done: true }.
  • إذا حدث خطأ في البث، سيتم رفض الوعد مع تضمين الخطأ ذي الصلة.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

سمة locked

يمكنك التحقّق مما إذا كان قد تم قفل مصدر بيانات قابل للقراءة من خلال الوصول إلى ReadableStream.locked موقعه.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

عيّنات رمز بث قابل للقراءة

يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها. عليك أولاً إنشاء ReadableStream يحدِّد في وسيطته underlyingSource (أي فئة TimestampSource) طريقة start(). تطلب هذه الطريقة من controller في البث enqueue() طابعًا زمنيًا كل ثانية لمدة عشر ثوانٍ. أخيرًا، يطلب من وحدة التحكّم close() البث. يمكنك استخدام هذا البث من خلال إنشاء قارئ باستخدام الطريقة getReader() واستدعاء read() إلى أن يتم done البث.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

التكرار غير المتزامن

قد لا تكون واجهة برمجة التطبيقات الأكثر ملاءمةً هي التحقّق من كل دورة read() للحلقة إذا كان البث done. لحسن الحظ، ستتوفّر قريبًا طريقة أفضل لإجراء ذلك: التكرار غير المتزامن.

for await (const chunk of stream) {
  console.log(chunk);
}

إنّ أحد الحلول البديلة لاستخدام التكرار غير المتزامن اليوم هو تنفيذ السلوك باستخدام polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

بدء بث قابل للقراءة

تُنشئ طريقة tee() لواجهة ReadableStream بثًا قابلاً للقراءة حاليًا، وتُعرِض صفيفًا من عنصرَين يحتوي على الفرعين الناتجَين كمثيلَين جديدَين من ReadableStream. يتيح ذلك لقراءين قراءة بث في الوقت نفسه. يمكنك إجراء ذلك، على سبيل المثال، في مشغّل خدمات إذا أردت جلب استجابة من الخادم وبثّها إلى المتصفّح، ولكن أيضًا بثّها إلى التخزين المؤقت لمشغّل الخدمات. وبما أنّه لا يمكن استخدام نص الردّ أكثر من مرة، ستحتاج إلى نسختَين للقيام بذلك. لإلغاء عملية البث، عليك إلغاء كلا الفرعين الناتجَين. سيؤدي بدء بث محتوى إلى قفله بشكل عام طوال المدة المحددة، ما يمنع القرّاء الآخرين من قفله.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

تدفّقات وحدات البايت القابلة للقراءة

بالنسبة إلى مصادر البيانات التي تمثّل وحدات البايت، يتم توفير إصدار موسّع من مصدر البيانات القابل للقراءة لمعالجة وحدات البايت بكفاءة، لا سيما من خلال تقليل عدد النُسخ. تتيح أحداث "بث الوحدات" اكتساب قراء "أحضر ذاكرتك المؤقتة بنفسك" (BYOB). يمكن أن يقدّم التنفيذ التلقائي مجموعة من النتائج المختلفة، مثل سلاسل أو مخازن مصفوفات في حال استخدام WebSockets، في حين تضمن تدفّقات البايتات إخراج البايتات. بالإضافة إلى ذلك، يحصل القرّاء الذين يستخدمون BYOB على مزايا الاستقرار. ويعود سبب ذلك إلى أنّه في حال فصل المخزن المؤقت، يمكن ضمان عدم الكتابة في المخزن المؤقت نفسه مرّتين، وبالتالي تجنُّب حالات تعارض البيانات. يمكن لقراء BYOB تقليل عدد المرات التي يحتاج فيها المتصفّح إلى تنفيذ عملية جمع المهملات، لأنّه يمكنه إعادة استخدام ذاكرة التخزين المؤقت.

إنشاء بث وحدات بت قابلة للقراءة

يمكنك إنشاء بث بايتات قابل للقراءة عن طريق تمرير مَعلمة type إضافية إلى الدالة الإنشائية ReadableStream().

new ReadableStream({ type: 'bytes' });

فريق underlyingSource

يتم منح المصدر الأساسي لبث وحدات البايت القابلة للقراءة ReadableByteStreamController للتلاعب به. تأخذ طريقة ReadableByteStreamController.enqueue() وسيطة chunk تكون قيمتها ArrayBufferView. تعرِض السمة ReadableByteStreamController.byobRequest طلب سحب BYOB الحالي، أو قيمة فارغة في حال عدم توفّر طلب. أخيرًا، يعرض الحقل ReadableByteStreamController.desiredSize الحجم المطلوب لملء "القائمة الداخلية" للبث الخاضع للرقابة.

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لصانع ReadableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمة واحدة:

  • highWaterMark: عدد غير سالب من وحدات البايت يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه. يُستخدَم ذلك لتحديد الضغط الخلفي الذي يظهر من خلال السمة ReadableByteStreamController.desiredSize المناسبة. ويحدِّد أيضًا حالات استدعاء طريقة pull() في المصدر الأساسي.

الطريقتان getReader() وread()

يمكنك بعد ذلك الوصول إلى ReadableStreamBYOBReader من خلال ضبط المَعلمة mode وفقًا لذلك: ReadableStream.getReader({ mode: "byob" }). ويتيح ذلك التحكّم بشكل أدق في تخصيص ملف التخزين المؤقت لتجنُّب النُسخ. للقراءة من بث البايتات، عليك استدعاء ReadableStreamBYOBReader.read(view)، حيث يكون view هو ArrayBufferView.

نموذج رمز قابل للقراءة لبث البايتات

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تعرض الدالة التالية مصادر بيانات قابلة للقراءة تتيح قراءة فعالة بدون نسخ لجدول مرتب بشكل عشوائي. بدلاً من استخدام حجم قطعة محدّد مسبقًا يبلغ 1,024، يحاول ملء المخازن المؤقتة التي يقدّمها المطوّر، ما يتيح التحكّم الكامل.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

آلية تيار قابل للكتابة

مصدر البيانات القابل للكتابة هو وجهة يمكنك كتابة البيانات فيها، ويتم تمثيله في JavaScript بكائن WritableStream. ويعمل هذا الإجراء كعملية تجريد على مستوى أعلى من وحدة معالجة أساسية، وهي وحدة معالجة I/O من المستوى الأدنى يتم فيها كتابة البيانات الأولية.

يتم كتابة البيانات في مصدر البيانات من خلال كاتب، كل جزء في المرة الواحدة. يمكن أن تتخذ المقتطفات أشكالًا متعددة، تمامًا مثل المقتطفات في قارئ. يمكنك استخدام أي رمز تريده لإنشاء المقاطع الجاهزة للكتابة. ويُطلق على الكاتب والرمز المرتبط به اسم منتج.

عند إنشاء كاتب وبدء الكتابة في مصدر بيانات (كاتب نشط)، يُقال أنّه مُقفَل عليه. يمكن لكاتب واحد فقط الكتابة في بث قابل للكتابة في المرة الواحدة. إذا أردت أن يبدأ كاتب آخر في كتابة محتوى في البث، عليك عادةً تحريره قبل ربط كاتب آخر به.

تتتبّع القائمة الداخلية الأجزاء التي تمّت كتابتها إلى البثّ ولكن لم تتم معالجتها بعد من خلال وحدة الاستقبال الأساسية.

استراتيجية وضع المحتوى في قائمة الانتظار هي عنصر يحدّد كيفية إرسال بث إشارة الضغط الخلفي استنادًا إلى حالة قائمة الانتظار الداخلية. تحدِّد استراتيجية وضع المحتوى في "قائمة المحتوى التالي" حجمًا لكل جزء، وتقارن بين الحجم الإجمالي لجميع الأجزاء في "قائمة المحتوى التالي" وعدد محدّد يُعرف باسم الحد الأقصى المسموح به.

يُطلق على البنية النهائية اسم عنصر تحكّم. يحتوي كل مصدر بيانات قابل للكتابة على وحدة تحكّم مرتبطة تسمح لك بالتحكم في مصدر البيانات (على سبيل المثال، لإيقافه).

إنشاء بث قابل للكتابة

توفّر واجهة WritableStream في واجهة برمجة التطبيقات Streams API نموذجًا معياريًا لتدوين بيانات البث إلى وجهة، تُعرف باسم "الوجهة". يتضمّن هذا العنصر ميزة الضغط الخلفي وميزة وضع المحتوى في قائمة الانتظار. يمكنك إنشاء بث قابل للكتابة من خلال استدعاء عنصر الإنشاء WritableStream(). يحتوي على مَعلمة underlyingSink اختيارية، والتي تمثّل عنصرًا يتضمّن طُرقًا وسمات تحدّد سلوك مثيل البث الذي تم إنشاؤه.

فريق underlyingSink

يمكن أن يتضمّن underlyingSink الطرق الاختيارية التالية التي يحدّدها المطوّر. المَعلمة controller المُرسَلة إلى بعض الطرق هي WritableStreamDefaultController.

  • start(controller): يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. يجب أن تهدف محتويات هذه الطريقة إلى الوصول إلى الحوض الأساسي. إذا كان من المفترض أن تتم هذه العملية بشكل غير متزامن، يمكن أن تُرجع وعدًا للإشارة إلى النجاح أو الفشل.
  • write(chunk, controller): سيتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة من البيانات (المحدّدة في المَعلمة chunk) جاهزة للكتابة في وحدة الاستقبال الأساسية. ويمكن أن يعرض وعدًا لتحديد نجاح عملية الكتابة أو تعذّرها. لن يتمّ استدعاء هذه الطريقة إلا بعد نجاح عمليات الكتابة السابقة، ولن يتمّ استدعاؤها مطلقًا بعد إغلاق البث أو إيقافه.
  • close(controller): سيتمّ استدعاء هذه الطريقة إذا أشار التطبيق إلى أنّه قد انتهى من كتابة المقاطع إلى البث. يجب أن تُجري العناصر ما يلزم لإنهاء عمليات الكتابة إلى الوحدة الأساسية لمعالجة البيانات، وإلغاء إمكانية الوصول إليها. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد لإشارة إلى النجاح أو الفشل. لن يتمّ استدعاء هذه الطريقة إلّا بعد نجاح جميع عمليات الكتابة التي تمّ وضعها في "قائمة الانتظار".
  • abort(reason): سيتمّ استدعاء هذه الطريقة إذا أراد التطبيق إغلاق البث فجأة ووضعه في حالة خطأ. ويمكنه تنظيف أي موارد محجوزة، تمامًا مثل close()، ولكن سيتمّ استدعاء abort() حتى إذا تمّ وضع عمليات الكتابة في قائمة الانتظار. سيتم تجاهل هذه الأجزاء. إذا كانت هذه العملية غير متزامنة، يمكنها عرض وعد للإشارة إلى النجاح أو الفشل. تحتوي المَعلمة reason على DOMString تصف سبب إيقاف البث.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

تمثّل واجهة WritableStreamDefaultController Streams API وحدة تحكّم تتيح التحكّم في حالة WritableStream أثناء الإعداد، أو عند إرسال المزيد من الأجزاء للكتابة، أو في نهاية الكتابة. عند إنشاء WritableStream، يتم منح الحوض الأساسي مثيلًا WritableStreamDefaultController مطابقًا للتعامل معه. يحتوي WritableStreamDefaultController على طريقة واحدة فقط: WritableStreamDefaultController.error()، مما يؤدي إلى حدوث خطأ في أي تفاعلات مستقبلية مع البث المرتبط. تتيح WritableStreamDefaultController أيضًا استخدام السمة signal التي تعرض مثيلًا لسمة AbortSignal، ما يسمح بإيقاف عملية WritableStream إذا لزم الأمر.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

فريق queuingStrategy

الوسيطة الثانية، وهي اختيارية أيضًا، لصانع WritableStream() هي queuingStrategy. وهو عنصر يحدّد بشكل اختياري استراتيجية وضع في "قائمة المحتوى التالي" للبث، ويأخذ مَعلمتَين:

  • highWaterMark: رقم غير سالب يشير إلى الحد الأقصى للبث باستخدام استراتيجية الانتظار هذه.
  • size(chunk): دالة تحسب الحجم المحدّد غير السالب لقيمة الجزء المحدّدة وتعرضه. تُستخدَم النتيجة لتحديد الضغط الخلفي الذي يظهر من خلال السمة WritableStreamDefaultWriter.desiredSize المناسبة.

الطريقتان getWriter() وwrite()

للكتابة في بث قابل للكتابة، تحتاج إلى كاتب، وسيكون WritableStreamDefaultWriter. تُعرِض طريقة getWriter() لواجهة WritableStream مثيلًا جديدًا من WritableStreamDefaultWriter وتُقفِل البث على هذا المثيل. عندما يكون مجرى البث مقفلًا، لا يمكن الحصول على كاتب آخر إلى أن يتم تحرير الكاتب الحالي.

تُسجِّل طريقة write() واجهة WritableStreamDefaultWriter مجموعة بيانات تم تمريرها في WritableStream ووحدة المعالجة الأساسية، ثم تُعرِض وعدًا يتم حلّه للإشارة إلى نجاح عملية الكتابة أو تعذّرها. يُرجى العِلم أنّ معنى "النجاح" يعتمد على وحدة النقل الأساسية، فقد يشير ذلك إلى قبول المقطع، وليس بالضرورة إلى حفظه بأمان في وجهته النهائية.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

سمة locked

يمكنك التحقّق مما إذا كان مصدر بيانات قابل للكتابة قد تم قفله من خلال الوصول إلى WritableStream.locked موقعه.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نموذج رمز Writable stream

يعرض نموذج الرمز البرمجي أدناه جميع الخطوات أثناء تنفيذها.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

توجيه بث قابل للقراءة إلى بث قابل للكتابة

يمكن توجيه بث قابل للقراءة إلى بث قابل للكتابة من خلال طريقة pipeTo() للبث القابل للقراءة. تُوجِّه ReadableStream.pipeTo() القيمة الحالية ReadableStreamإلى WritableStream معيّنة وتُعرِض وعدًا يتم تنفيذه عند اكتمال عملية توجيه البيانات بنجاح، أو يتم رفضه في حال حدثت أي أخطاء.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

إنشاء بث تحويل

تمثّل واجهة TransformStream في Streams API مجموعة من البيانات القابلة للتحويل. يمكنك إنشاء بث تحويل من خلال استدعاء الدالة الإنشائية TransformStream() التي تنشئ كائن بث تحويل وتُعرِضه من المعالجات المحدّدة. يقبل باني TransformStream() كأحد وسيطاته الأولى عنصر JavaScript اختياريًا يمثّل transformer. يمكن أن تحتوي هذه العناصر على أيٍّ من الطرق التالية:

فريق transformer

  • start(controller): يتمّ استدعاء هذه الطريقة على الفور عند إنشاء الكائن. ويُستخدَم عادةً هذا الإجراء لإضافة أجزاء البادئة إلى "قائمة الانتظار" باستخدام controller.enqueue(). سيتم قراءة هذه الأجزاء من الجانب القابل للقراءة، ولكنّها لا تعتمد على أي عمليات كتابة في الجانب القابل للكتابة. إذا كانت هذه العملية التمهيدية غير متزامنة، على سبيل المثال لأنّ الحصول على أجزاء البادئة يتطلّب بعض الجهد، يمكن للدالة عرض عملية غير مكتملة للإشارة إلى النجاح أو الفشل. وفي حال رفض العملية غير المكتملة، سيؤدي ذلك إلى حدوث خطأ في البث. ستعيد أداة إنشاء TransformStream() طرح أي استثناءات يتم طرحها.
  • transform(chunk, controller): يتمّ استدعاء هذه الطريقة عندما تكون مجموعة جديدة مكتوبة في الأساس على الجانب القابل للكتابة جاهزة للتحويل. يضمن تنفيذ البث عدم استدعاء هذه الدالة إلا بعد نجاح عمليات التحويل السابقة، ولن يتم استدعاؤها أبدًا قبل اكتمال start() أو بعد استدعاء flush(). تُنفِّذ هذه الدالة عملية التحويل الفعلية لبث التحويل. ويمكنه إضافة النتائج إلى "قائمة الانتظار" باستخدام controller.enqueue(). يسمح ذلك بكتابة قطعة واحدة في الجانب القابل للكتابة، ما يؤدي إلى عدم ظهور أيّ قطع أو ظهور عدة قطع في الجانب القابل للقراءة، وذلك استنادًا إلى عدد مرّات استدعاء controller.enqueue(). إذا كانت عملية التحويل غير متزامنة، يمكن أن تعرِض هذه الدالة وعدًا للإشارة إلى نجاح عملية التحويل أو فشلها. سيؤدي الوعد المرفوض إلى حدوث خطأ في كل من الجانبَين القابلَين للقراءة والكتابة من مجرى التحويل. في حال عدم تقديم طريقة transform()، يتم استخدام عملية تحويل الهوية التي تضيف إلى "قائمة الانتظار" أجزاء لم يتم تغييرها من الجانب القابل للكتابة إلى الجانب القابل للقراءة.
  • flush(controller): يتمّ استدعاء هذه الطريقة بعد أن تتم تحويل كلّ الأجزاء التي تمّت كتابتها على الجانب القابل للكتابة من خلال اجتياز transform() بنجاح، ويكون الجانب القابل للكتابة على وشك الإغلاق. ويُستخدَم هذا الإجراء عادةً لإضافة أجزاء اللاحقة إلى "الجانب القابل للقراءة"، قبل أن يصبح هذا الجانب أيضًا مغلقًا. إذا كانت عملية الفلاش غير متزامنة، يمكن للدالة عرض وعد لتحديد ما إذا كان الفلاش ناجحًا أو تعذّر إكماله، وسيتم إبلاغ المُتصل بالدالة stream.writable.write() بالنتيجة. بالإضافة إلى ذلك، سيؤدي الوعد المرفوض إلى حدوث خطأ في كلّ من الجانبَين القابلَين للقراءة والكتابة في البث. يتم التعامل مع طرح استثناء بالطريقة نفسها التي يتم بها عرض وعد مُرفوض.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتيجيات وضع المحتوى في "قائمة المحتوى التالي" في writableStrategy وreadableStrategy

المَعلمتان الثانيتان والثالثتان الاختياريتان لصانع TransformStream() هما writableStrategy وreadableStrategy، وهما استراتيجيتَا وضع في الطابور. ويتم تحديدها على النحو الموضّح في قسمَي القراءة والكتابة للبث على التوالي.

نموذج رمز بث التحويل

يعرض نموذج الرمز البرمجي التالي بثًا بسيطًا للتحويل.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

توجيه بث قابل للقراءة من خلال بث تحويل

توفّر طريقة pipeThrough() لواجهة ReadableStream طريقة قابلة للربط لتوجيه البث الحالي من خلال بث تحويل أو أي زوج آخر قابل للكتابة/القراءة. سيؤدي توجيه بث إلى قفله بشكل عام طوال مدة التوجيه، ما يمنع القرّاء الآخرين من قفله.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

يوضّح نموذج الرمز التالي (المُعدّ بشكل مصطنع بعض الشيء) كيفية تنفيذ إصدار "صاخب" من fetch() يحوّل النص إلى نص كبير باستخدام وعد الاستجابة المعروض كبث وتحويل كل جزء إلى نص كبير. وتتمثل ميزة هذا النهج في أنّك لست بحاجة إلى الانتظار إلى أن يتم تنزيل المستند بأكمله، ما يمكن أن يحدث فرقًا كبيرًا عند التعامل مع الملفات الكبيرة.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

عرض توضيحي

يعرض المقطع التجريبي أدناه أحداث البث القابلة للقراءة والكتابة والتحويل أثناء تنفيذها. ويتضمن أيضًا أمثلة على سلاسل الأنابيب pipeThrough() وpipeTo()، ويوضّح أيضًا tee(). يمكنك اختياريًا تشغيل الإصدار التجريبي في نافذته الخاصة أو عرض رمز المصدر.

أحداث بث مفيدة متاحة في المتصفّح

هناك عدد من أحداث البث المفيدة المضمّنة في المتصفّح مباشرةً. يمكنك بسهولة إنشاء ReadableStream من قطعة بيانات. تُعرِض طريقة stream() في واجهة Blob ReadableStream الذي يعرض عند القراءة البيانات المضمّنة في العنصر المصغّر. تذكَّر أيضًا أنّ عنصر File هو نوع محدّد من ملف Blob، ويمكن استخدامه في أي سياق يمكن استخدام ملف blob فيه.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

يُطلق على الصيغ المخصّصة للبث من TextDecoder.decode() وTextEncoder.encode() اسم TextDecoderStream و TextEncoderStream على التوالي.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

يمكنك بسهولة ضغط ملف أو فك ضغطه باستخدام بثَي التحويل CompressionStream و DecompressionStream على التوالي. يوضّح نموذج الرمز البرمجي أدناه كيفية تنزيل مواصفات Streams وضغطها (باستخدام gzip) مباشرةً في المتصفّح وكتابة الملف المضغوط على القرص مباشرةً.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

إنّ FileSystemWritableFileStream وfetch() تدفّقات الطلبات التجريبية في File System Access API هي مثالان على تدفّقات البيانات القابلة للكتابة في الاستخدام الفعلي.

تستخدِم Serial API بشكل كبير كلّ من مصادر البيانات القابلة للقراءة والكتابة.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

أخيرًا، تدمج واجهة برمجة تطبيقات WebSocketStream أحداث البث مع واجهة برمجة تطبيقات WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

مراجع مفيدة

الشكر والتقدير

تمت مراجعة هذه المقالة من قِبل Jake Archibald، François Beaufort، Sam Dutton، Mattias Buelens، Surma، Joe Medley، Adam Rice. ساعدتني مشاركات المدونة التي نشرها Jake Archibald كثيرًا في فهم البث المباشر. استُوحيت بعض نماذج الرموز البرمجية من استكشافات مستخدم GitHub @bellbind، واقتبست أجزاء من النثر بشكل كبير من مستندات الويب على MDN في Streams. لقد بذل مؤلفو معيار أحداث البث جهدًا رائعًا في كتابة هذه المواصفات. الصورة الرئيسية من إنشاء ريان لارا على Unsplash.