Akışlar: Ayrıntılı kılavuz

Streams API ile akışları okunabilir ve yazılabilir hale getirmeyi ve dönüştürmeyi öğrenin.

Streams API, ağ üzerinden alınan veri akışlarına programatik olarak erişmenize olanak tanır yerel ve harici kaynaklarla bunları JavaScript ile işleyeceksiniz. Akış almak, göndermek veya dönüştürmek istediğiniz bir kaynağın dökümünü içerir önce küçük parçalara ayırıyor, sonra da bu parçaları aniden işliyorsunuz. Akış hizmetleri, web sayfalarında gösterilecek HTML veya videolar gibi öğeleri alırken de Bu özellik, 2015'te akışlarla fetch tarihinden önce JavaScript'te mevcut değildi.

Önceden, bir tür kaynağı (ör. video, metin dosyası vb.) işlemek istediğinizde tüm dosyayı indirmeniz ve uygun bir biçimdeki seri hâle getirilmesini beklemeniz, ve sonra onu işleyebiliriz. Canlı yayınlarda JavaScript, tüm bunlar değişir. Artık ham verileri JavaScript ile aşamalı olarak dize veya blob oluşturmaya gerek kalmadan istemcide kullanılabilir hâle gelir. Bu sayede, aşağıda ele aldığımız bir dizi kullanım alanından yararlanabilirsiniz:

  • Video efektleri: Efektlerin uygulandığı bir dönüşüm akışı aracılığıyla okunabilir bir video akışı oluşturma gerçekleşmesidir.
  • Veri (sıkıştırma) açma: Dosya akışını, seçici bir şekilde bir dönüşüm akışı üzerinden bağlama (de) sıkıştırmasını sağlar.
  • Resim kodu çözme: baytların kodunu çözen bir dönüşüm akışı üzerinden bir HTTP yanıt akışı bağlama sonra bit eşlemler içeren başka bir dönüşüm akışı üzerinden yapılır. Eğer Service Worker'ın fetch işleyicisinin içinde yüklü olduğundan, çoklu dolgu işlemini şeffaf bir şekilde yapmanıza olanak tanır. yeni resim formatları kullanıyor.

Tarayıcı desteği

ReadableStream ve WritableStream

Tarayıcı Desteği

  • Chrome: 43..
  • Kenar: 14..
  • Firefox: 65..
  • Safari: 10.1.

Kaynak

TransformStream

Tarayıcı Desteği

  • Chrome: 67..
  • Kenar: 79..
  • Firefox: 102..
  • Safari: 14.1.

Kaynak

Temel kavramlar

Farklı canlı yayın türleriyle ilgili ayrıntılara geçmeden önce bazı temel kavramlardan bahsedeceğim.

Topak

Parça, bir akışa yazılan veya akıştan okunan tek bir veri parçasıdır. Herhangi bir tür; akışlar farklı türlerde parçalar da içerebilir. Çoğu zaman bir parça, en önemli atom ya da veri birimidir. Örneğin, bir bayt akışı 16 parçadan oluşan Tek bayt yerine Uint8Array birim.

Okunabilir akışlar

Okunabilir bir akış, okuyabileceğiniz bir veri kaynağını temsil eder. Başka bir deyişle, veriler okunaklı bir akışın dışına çıkabilir. Okunabilir bir akış, ReadableStream öğesinin bir örneğidir. sınıfını kullanır.

Yazılabilir akışlar

Yazılabilir akış, verilerin yazabileceğiniz bir hedefi temsil eder. Başka bir deyişle veriler girdiğini belirtir. Somut olarak, yazılabilir akış, WritableStream sınıf.

Akışları dönüştürme

Dönüşüm akışı, bir akış çiftinden oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış. ve okunabilir bir akış olmasını sağlar. Bunun için gerçek dünyadan bir benzetme, simultane çevirmen anında çeviri yapıyor. Dönüşüm akışına özgü bir şekilde, kod yazma, yeni verilerin emin olmanız gerekir. Somut olarak, writable ve readable özelliklerine sahip tüm nesneler sunum yapabilir. akış olarak ele alacağız. Ancak standart TransformStream sınıfı, oluşturma işlemini kolaylaştırır. şekilde birbirine bağlandığından emin olun.

Zincir borular

Akışlar, öncelikli olarak birbirlerine özetlenerek kullanılır. Okunabilir bir akış doğrudan bağlanabilir okunabilir akışın pipeTo() yöntemini kullanarak yazılabilir bir akışa gönderebilir veya veya daha fazla dönüştürme akışı gerçekleştirin.pipeThrough() akışlar boru zinciri olarak adlandırılır.

Sızıntı

Bir boru zinciri oluşturulduktan sonra parçaların ne kadar hızlı akması gerektiğine dair sinyaller yayar adım adım açıklayacağım. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa sinyali geriye yayar boru zincirinden geçerek, nihayetinde orijinal kaynağın parçalar üretmeyi bırakması söylenene kadar, hızlıdır. Bu akışın normalleştirilmesi sürecine karşı basınç adı verilir.

Tişört

Okunabilir bir akış, tee() yöntemi kullanılarak ("T" harfi şeklinde adlandırılır) girilebilir. Bu işlem akışı kilitler, yani artık doğrudan kullanılamaz. ancak her biri iki yeni akışlar, ("dallar" olarak adlandırılır) ve bağımsız olarak tüketilebilir. Akışlar geri sarılamayacağı veya yeniden başlatılamayacağı için teeing de önemlidir. Bu konuyu ileride ayrıntılı olarak açıklayacağız.

Getirme API'sine yapılan bir çağrıdan gelen okunabilir bir akıştan oluşan, ardından çıkışı aktarılan ilk okunabilir akış için tarayıcıya ve sonuç olarak elde edilen ikinci okunabilir akış için Service Worker önbelleğine gönderilen bir dönüşüm akışı aracılığıyla gelen okunabilir bir akıştan oluşan ardışık düzen zinciri.
Bir boru zinciri.

Okunabilir bir akışın mekanikleri

Okunabilir akış, JavaScript’te bir ReadableStream nesne temel kaynaktaki diğer akışlar. İlgili içeriği oluşturmak için kullanılan ReadableStream() oluşturucu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür Temel kaynak türleri:

  • İtme kaynakları, siz bunlara eriştiğinizde sürekli olarak size veri gönderir. Bunu yapmak size kalmıştır akışı başlatabilir, duraklatabilir veya iptal edebilirsiniz. Canlı video akışları, sunucu tarafından gönderilen etkinlikler, veya WebSockets'i kullanabilirsiniz.
  • Çekme kaynakları, bağlandıktan sonra bu kaynaklardan açıkça veri istemenizi gerektirir. Örnekler fetch() veya XMLHttpRequest çağrıları aracılığıyla yapılan HTTP işlemlerini dahil etme

Akış verileri, parçalar adı verilen küçük parçalar halinde sırayla okunur. Bir akışa yerleştirilen parçaların sıralandığı söylenir. Yani sırada bekliyor okunmaya hazır. Dahili sıra, henüz okunmamış parçaları takip eder.

Sıraya ekleme stratejisi, bir yayının akış değerine göre nasıl karşı basıncı işaret edeceğini belirleyen bir nesnedir. dahili sırasının durumunu gösterir. Sıraya ekleme stratejisi her parçaya bir boyut atar ve kuyruktaki tüm parçaların toplam boyutunu yüksek su işareti olarak bilinen belirli bir sayıya dönüştürün.

Akış içindeki parçalar, bir okuyucu tarafından okunur. Bu okuyucu, verileri her saniyede bir yığın Böylece, cihaz üzerinde istediğiniz her tür işlemi yapabilirsiniz. Okuyucu ve diğer tüketici adı verilir.

Bu bağlamdaki bir sonraki yapı denetleyici olarak adlandırılır. Her okunabilir akışın ilişkilendirilmiş bir denetleyici, adından da anlaşılacağı üzere akışı kontrol etmenize olanak tanır.

Bir akışı aynı anda yalnızca bir okuyucu okuyabilir. Kullanıcı oluşturulduğunda ve bir akışı okumaya başladığında etkin bir okuyucu haline gelirse) kendisine bağlıdır. Başka bir okuyucunun Canlı yayınınızı okuyorsanız herhangi bir işlem yapmadan önce ilk okuyucuyu serbest bırakmanız gerekir. (yine de canlı yayınlara tee vurabilirsiniz).

Okunabilir bir akış oluşturma

Kurucusunu çağırarak okunabilir bir akış oluşturursunuz ReadableStream(). Oluşturucu, bir nesneyi temsil eden isteğe bağlı underlyingSource bağımsız değişkenine sahip yöntemleri ve özellikleriyle birlikte çalışır.

underlyingSource

Bu işlem, geliştirici tarafından tanımlanan isteğe bağlı aşağıdaki yöntemleri kullanabilir:

  • start(controller): Nesne oluşturulduğunda hemen çağrılır. İlgili içeriği oluşturmak için kullanılan yöntemini kullanarak akış kaynağına erişebilir ve başka herhangi bir şey yapabilir. akış işlevini ayarlamak için gereklidir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı ya da başarısızlık anlamına gelen bir vaatte bulunmanızı sağlar. Bu yönteme iletilen controller parametresi CANNOT TRANSLATE ReadableStreamDefaultController
  • pull(controller): Daha fazla parça getirildikçe akışı kontrol etmek için kullanılabilir. Google akışın dahili parça sırası dolmadığı sürece, sıraya alınana kadar arka arkaya çağrılır yüksek su seviyesine ulaşır. pull() aramasının sonucu bir vaat ise pull(), söz konusu taahhüt yerine getirilene kadar tekrar aranmayacak. Vaat reddedilirse akış hata verir.
  • cancel(reason): Yayın tüketicisi, yayını iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController, aşağıdaki yöntemleri destekler:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Benzer şekilde, ReadableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy şeklindedir. İsteğe bağlı olarak akış için sıraya ekleme stratejisini tanımlayan bir nesnedir. parametre:

  • highWaterMark: Bu sıraya ekleme stratejisi kullanılarak akışın yüksek su işaretini gösteren negatif olmayan bir sayıdır.
  • size(chunk): Belirli bir parça değerinin sonlu, negatif olmayan, sonlu boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygun ReadableStreamDefaultController.desiredSize özelliği aracılığıyla kendini göstererek karşı basıncı belirlemek için kullanılır. Ayrıca temel kaynağın pull() yönteminin ne zaman çağrılacağını da belirler.
ziyaret edin.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() ve read() yöntemleri

Okunabilir bir akıştan okumak için bir okuyucuya ihtiyacınız vardır. ReadableStreamDefaultReader. ReadableStream arayüzünün getReader() yöntemi bir okuyucu oluşturur ve akışı somut olarak ortaya koyar. Akış kilitliyken bu okuyucu yayınlanana kadar başka bir okuyucu edinilemez.

read() ReadableStreamDefaultReader arayüzünün yöntemi, sonraki parçasına ekleyebilirsiniz. Reklamın durumuna bağlı olarak bir sonuçla yerine getirir veya reddeder akışla değiştirebilirsiniz. Farklı olasılıklar aşağıda verilmiştir:

  • Parça varsa formdaki bir nesneyle taahhüt yerine getirilecektir
    { value: chunk, done: false }.
  • Akış kapatılırsa vadettiğiniz şey şu biçimde bir nesneyle yerine getirilecektir
    { value: undefined, done: true }.
  • Akışta hata oluşursa söz, ilgili hatayla birlikte reddedilir.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked mülkü

Okunabilir bir yayının kilitli olup olmadığını kontrol etmek için ReadableStream.locked

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Okunabilir akış kodu örnekleri

Aşağıdaki kod örneğinde tüm adımların işleyiş şekli gösterilmektedir. İlk olarak ReadableStream underlyingSource bağımsız değişkeni (yani TimestampSource sınıfı) bir start() yöntemini tanımlar. Bu yöntem, akışın controller öğesine şunları söyler: enqueue() on saniye boyunca saniyede bir zaman damgası. Son olarak, denetleyiciye akışı close() yapmasını söyler. Bunu tüketiyorsunuz getReader() yöntemiyle bir okuyucu oluşturup yayın tamamlanana kadar read() işlevini çağırarak akışı gerçekleştirin done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Eşzamansız yineleme

Akışın done olup olmadığını her read() döngü yinelemesini kontrol etmek en uygun API olmayabilir. Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız iterasyon.

for await (const chunk of stream) {
  console.log(chunk);
}
.

Şu anda eşzamansız yinelemeyi kullanmanın geçici bir yolu, bu davranışı bir çoklu dolguyla uygulamaktır.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Okunabilir bir canlı yayını başlatma

tee() yöntemi, ReadableStream arayüzü, mevcut okunabilir akışı başlatarak iki öğeli bir dizi döndürür. , oluşturulan iki dalı yeni ReadableStream örnekleri olarak içerir. Bu da akışı aynı anda okumasını sağlayan iki okuyucu bulunur. Örneğin, sunucudan bir yanıt alıp tarayıcıya aktarmak, ancak aynı zamanda bunu hizmet çalışanı önbelleğine alınır. Yanıt gövdesi birden fazla kez kullanılamayacağından iki kopyaya ihtiyacınız var yardımcı oluyorum. Akışı iptal etmek için sonuçta ortaya çıkan her iki dalı da iptal etmeniz gerekir. Canlı yayın başlatma kodu genellikle süre boyunca kilitleyerek diğer okuyucuların kilitlenmesini önler.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Okunabilir bayt akışları

Baytları temsil eden akışlarda okunabilir akışın genişletilmiş bir sürümü sağlanır. veri biriktirmemizi sağlıyor ve özellikle de kopyaları en aza indirerek yapabilirsiniz. Bayt akışları kendi arabelleğinizi getirmenizi sağlar (BYOB) okuyucuları edinmenize olanak tanır. Varsayılan uygulama, bir dizi farklı çıkış sağlayabilir. Örneğin, WebSocket'lerde dize veya dizi arabellekleri olarak kullanılır. Bayt akışları ise bayt çıkışını garanti eder. Ayrıca, KOBİ okuyucularının kararlılıkla ilgili avantajları vardır. Bu Çünkü bir tampon ayrılırsa aynı tamponu iki kez yazmamayı garanti edebilir. yarış koşullarından kaçınmış olur. BYOB okuyucuları, tarayıcının çalıştırılması gereken sayıyı azaltabilir tamponları yeniden kullanabildiği için atık toplama.

Okunabilir bayt akışı oluşturma

Dosya adına ek type parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz. ReadableStream() oluşturucu.

new ReadableStream({ type: 'bytes' });

underlyingSource

Okunabilir bir bayt akışının temel kaynağına bir ReadableByteStreamController verilir. manipüle edin. Bu ReadableByteStreamController.enqueue() yöntemi, değeri olan bir chunk bağımsız değişkeni alır bir ArrayBufferView. ReadableByteStreamController.byobRequest özelliği, BYOB pull isteği veya yoksa boş değer. Son olarak, ReadableByteStreamController.desiredSize özelliği, kontrol edilen akışın dahili sırasını doldurmak için istenen boyutu döndürür.

queuingStrategy

Benzer şekilde, ReadableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy şeklindedir. İsteğe bağlı olarak akış için sıraya ekleme stratejisini tanımlayan bir nesnedir. parametresi:

  • highWaterMark: Bu sıraya ekleme stratejisi kullanılarak akışın yüksek su işaretini gösteren, negatif olmayan bayt sayısı. Bu, uygun ReadableByteStreamController.desiredSize özelliği aracılığıyla kendini göstererek karşı basıncı belirlemek için kullanılır. Ayrıca temel kaynağın pull() yönteminin ne zaman çağrılacağını da belirler.
ziyaret edin. ziyaret edin.

getReader() ve read() yöntemleri

Daha sonra, mode parametresini uygun şekilde ayarlayarak bir ReadableStreamBYOBReader erişimi elde edebilirsiniz: ReadableStream.getReader({ mode: "byob" }). Bu, arabellek üzerinde daha hassas kontrol sağlar . Bayt akışından okumak için şunu çağırmanız gerekir: ReadableStreamBYOBReader.read(view), burada view bir ArrayBufferView.

Okunabilir bayt akışı kodu örneği

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Aşağıdaki işlev, bir veri biriminin verimli sıfır kopya okumasına olanak tanıyan okunabilir bayt rastgele oluşturulmuş dizi. Önceden belirlenmiş 1024'lük parça boyutunu kullanmak yerine, tam kontrole olanak tanıyan tampon görevi görür.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Yazılabilir akışın mekanikleri

Yazılabilir akış, veri yazabileceğiniz bir hedeftir ve JavaScript'te bir WritableStream nesnesini tanımlayın. Bu alttaki lavabonun (alt seviyedeki bir G/Ç havuzunun) üst kısmı üzerinde bir soyutlama görevi görür. ve ham verilerin yazıya döküldüğünden emin olun.

Veriler, tek seferde bir parça olacak şekilde bir yazar aracılığıyla akışa yazılır. Bir yığın, tıpkı bir okuyucudaki parçalar gibi çok çeşitli Oluşturmak istediğiniz kodu kullanabilirsiniz parçalar yazmaya hazırdır. yazar ve ilişkili koda üretici adı verilir.

Bir yazar, oluşturulup bir akışa yazmaya başladığında (etkin yazar) buna Kilitli olmalıdır. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Bir tane daha canlı yayınınıza yazmak için yazmanız gerekir. Ardından, yazarsınız.

Dahili sıra, akışa yazılmış ancak henüz yazılmamış parçaları takip eder temel havuzda işlenir.

Sıraya ekleme stratejisi, bir yayının akış değerine göre nasıl karşı basıncı işaret edeceğini belirleyen bir nesnedir. dahili sırasının durumunu gösterir. Sıraya ekleme stratejisi her parçaya bir boyut atar ve kuyruktaki tüm parçaların toplam boyutunu yüksek su işareti olarak bilinen belirli bir sayıya dönüştürün.

Son yapıya denetleyici adı verilir. Her yazılabilir akışın, aşağıdakileri karşılayan ilişkili bir denetleyicisi vardır: akışı kontrol etmenize (örneğin, iptal etmenize) olanak tanır.

Yazılabilir akış oluşturma

WritableStream arayüzü Streams API, akış verilerini bilinen bir hedefe yazmak için standart bir soyutlama sunar. bir lavabo olarak kullanabilirsiniz. Bu nesne, yerleşik karşı basınç ve sıraya ekleme özelliğine sahiptir. Yazılabilir bir akış oluşturmak için: kurucusunu çağırmak WritableStream(). Bir nesneyi temsil eden isteğe bağlı underlyingSink parametresi içerir yöntemleri ve özellikleriyle birlikte çalışır.

underlyingSink

underlyingSink, aşağıdaki isteğe bağlı, geliştirici tarafından tanımlanan yöntemleri içerebilir. controller parametre, yöntemlerin bazılarına iletilen bir parametredir. WritableStreamDefaultController

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. İlgili içeriği oluşturmak için kullanılan temel havuza erişimi hedeflemelidir. Bu işlem, eş zamansız olarak yapıldığında, başarılı mı yoksa başarısız mı olacağına dair bir vaat verebilir.
  • write(chunk, controller): Bu yöntem, yeni bir veri parçası ( chunk parametresi) temel havuza yazılmaya hazırdır. Paydaşlara, daha önce işleminin başarılı veya başarısız olduğunu gösterir. Bu yöntem yalnızca öncekinden sonra çağrılacak yazması başarılı olur ve akış kapatıldıktan ya da iptal edildikten sonra hiçbir zaman iptal edilmez.
  • close(controller): Uygulama, yazmayı tamamladığını bildiriyorsa bu yöntem çağrılır parçalara böler. İçerikler, havuza erişimi serbest bırakır. Bu süreç eş zamansızsa bir vaat edilmiş olabilir. Bu yöntem yalnızca sıraya alınan tüm yazma işlemlerinden sonra çağrılır yardımcı olur.
  • abort(reason): Uygulama aniden kapatmak istediği yönünde sinyal verirse bu yöntem çağrılır aktarmalı ve hata durumuna getirmelidir. Tıpkı önceki adımlarda olduğu gibi, close(), ancak yazma işlemleri sıraya alınmış olsa bile abort() çağrılır. Bu parçalar atılacak kaçırdınız. Bu süreç eş zamansızsa başarılı veya başarısız olma vaadinde bulunabilir. İlgili içeriği oluşturmak için kullanılan reason parametresi, akışın neden iptal edildiğini açıklayan bir DOMString içeriyor.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

İlgili içeriği oluşturmak için kullanılan WritableStreamDefaultController Streams API'nin arayüzü, WritableStream durumunun kontrolüne olanak tanıyan bir denetleyiciyi temsil eder daha fazla parça gönderildiğinden veya yazının sonunda gönderilir. İnşa ederken WritableStream varsa temel havuza karşılık gelen bir WritableStreamDefaultController verilir. örnek olarak kullanabilirsiniz. WritableStreamDefaultController yalnızca bir yönteme sahip: WritableStreamDefaultController.error(), Bu durum, ileride ilişkili akışla kurulan etkileşimlerde hata oluşmasına neden olur. WritableStreamDefaultController, aynı zamandasignal AbortSignal, ve gerekirse WritableStream işleminin durdurulmasına izin verir.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Benzer şekilde, WritableStream() kurucusunun ikinci bağımsız değişkeni queuingStrategy şeklindedir. İsteğe bağlı olarak akış için sıraya ekleme stratejisini tanımlayan bir nesnedir. parametre:

  • highWaterMark: Bu sıraya ekleme stratejisi kullanılarak akışın yüksek su işaretini gösteren negatif olmayan bir sayıdır.
  • size(chunk): Belirli bir parça değerinin sonlu, negatif olmayan, sonlu boyutunu hesaplayıp döndüren bir işlev. Sonuç, uygun WritableStreamDefaultWriter.desiredSize özelliği aracılığıyla kendini göstererek karşı basıncı belirlemek için kullanılır.
ziyaret edin.

getWriter() ve write() yöntemleri

Yazılabilir bir akışa yazmak için bir yazara ihtiyacınız vardır. WritableStreamDefaultWriter WritableStream arayüzünün getWriter() yöntemi, yeni WritableStreamDefaultWriter örneği oluşturur ve akışı bu örneğe kilitler. Araç akış kilitli olduğundan, mevcut yazar yayınlanıncaya kadar başka yazar kazanılamaz.

write() yöntemindeki WritableStreamDefaultWriter arayüz, geçirilen bir veri parçasını WritableStream ve altındaki havuza yazar, ardından Böylece, yazma işleminin başarılı veya başarısız olduğunu gösterebilir. Önemli not: "başarı" ya da projenin parçanın kabul edildiğini gösterebilir. ve nihai hedefine güvenli bir şekilde kaydedilmesi her zaman mümkün değildir.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked mülkü

Yazılabilir bir akışın kilitli olup olmadığını kontrol etmek için WritableStream.locked

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Yazılabilir akış kodu örneği

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Okunabilir bir akışı yazılabilir bir akışa bağlama

Okunabilir bir akış, okunabilir akışın pipeTo() yöntemini kullanabilirsiniz. ReadableStream.pipeTo(), mevcut ReadableStream değerini belirli bir WritableStream öğesine bağlar ve bir bağlama işlemi başarıyla tamamlandığında yerine getirileceğini veya hata varsa reddedileceğini karşılaşılır.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Dönüşüm akışı oluşturma

Streams API'nin TransformStream arayüzü, dönüştürülebilir bir veri kümesini temsil eder. Siz oluşturucuyu TransformStream() çağırarak bir dönüşüm akışı oluşturun. Bu işlev, şunu oluşturur ve döndürür: belirtilen işleyicilerden bir dönüşüm akışı nesnesi. TransformStream() oluşturucusu şu şekilde kabul eder: ilk bağımsız değişkeni, transformer değerini temsil eden isteğe bağlı bir JavaScript nesnesidir. Bu tür nesneler aşağıdaki yöntemlerden herhangi birini içeren:

transformer

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Normal şartlarda bu, controller.enqueue() kullanılarak ön ek parçalarını sıraya almak için kullanılır. Bu parçalar okunacak yapabilirsiniz. Bu baştaki olması gerekir. Örneğin, ön ek parçalarını elde etmek biraz çaba gerektirdiğinden, işlev, başarılı veya başarısız olduğuna işaret eden bir vaat döndürebilir; bir vaatin reddedilmesi, akış şeklinde gösterilir. Atılan tüm istisnalar TransformStream() oluşturucusu tarafından yeniden atanır.
  • transform(chunk, controller): Bu yöntem, yeni bir yığın orijinal olarak yazılabilir taraf dönüştürülmeye hazır. Akış uygulaması, bu işlevin değeri, yalnızca önceki dönüştürmeler başarılı olduktan sonra çağrılır ve start() çağrılmadan önce veya flush() çağrıldıktan sonra kullanılabilir. Bu işlev, gerçek dönüşüm işlemini çalışmasını sağlar. controller.enqueue() kullanarak sonuçları sıraya alabilir. Bu yazılabilir tarafa tek bir parçanın sıfır veya birden çok parçayla sonuçlanmasına izin verir controller.enqueue() için kaç kez çağrıldığına bağlı olarak okunaklı tarafları değiştirin. Projenin aslında eş zamansızdır. Bu işlev, projenin başarılı veya başarısız olduğuna işaret eden bir vaat verebilir. dönüşüme geri dönüyor. Reddedilen bir vaat, dokümanın hem okunabilir hem de yazılabilir taraflarında hata akış dönüştürme. transform() yöntemi sağlanmazsa kimlik dönüşümü kullanılır. parçaları yazılabilir taraftan okunabilir tarafa doğru sıraya alır.
  • flush(controller): Bu yöntem, yazılabilir tarafa yazılan tüm parçalar yazıldıktan sonra çağrılır transform() üzerinden başarılı bir şekilde geçilerek dönüştürüldü ve yazılabilir taraf kapalı. Genellikle bu, sonek parçalarını okunabilir tarafta, ondan önce sıraya koymak için kullanılır. haline gelir. Temizlik işlemi eşzamansızsa fonksiyon, başarılı ya da başarısız olduğuna işaret eden sonuç, arayan kişiye iletilecektir stream.writable.write() Ayrıca, reddedilen bir taahhüt hem okunabilir hem de metinlerin her ikisinde de görünür. Bir istisnanın gönderilmesi, reddedilen ödevin döndürülmesiyle aynı şekilde değerlendirilir sözü.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategy ve readableStrategy sıraya alma stratejileri

TransformStream() oluşturucunun isteğe bağlı ikinci ve üçüncü parametreleri isteğe bağlıdır. writableStrategy ve readableStrategy sıraya alma stratejileri. Bu adımlar, okunabilir ve yazılabilir akış bölümlerine bakın.

Akış kodu örneğini dönüştürme

Aşağıdaki kod örneğinde basit bir dönüşüm akışı gösterilmektedir.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Okunabilir bir akışı dönüşüm akışı üzerinden ardışık olarak oluşturma

pipeThrough() ReadableStream arayüzünün yöntemi, mevcut akışı birbirine bağlamak için zincirlenebilir bir yol sağlar bir dönüştürme akışı veya başka bir yazılabilir/okunabilir çift aracılığıyla eşlenir. Akış düzeneği genellikle kilitlenir devre dışı bırakmaya çalışın. Böylece diğer okuyucular onu kilitlemez.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Sonraki kod örneğinde (biraz çelişkili) bir "bağırma"yı nasıl uygulayabileceğiniz gösterilmektedir fetch() sürümü döndüren yanıt vaadini kullanarak tüm metni büyük harf yapar akış olarak büyük/küçük harflerden oluşur. Bu yaklaşımın avantajı, teslimatın tamamlanması için tüm dokümanı indirecektir. Bu, büyük dosyalarla çalışırken büyük bir fark yaratabilir.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Aşağıdaki demoda okunabilir, yazılabilir ve dönüştürülen akışlar gösterilmektedir. Ayrıca bu eğitimde pipeThrough() ve pipeTo() boru zincirinin örneğidir ve tee() gösterilmektedir. İsterseniz demoyu kendi penceresinde açın ya da kaynak kodundan yararlanabilirsiniz.

Tarayıcıda kullanılabilen faydalı yayınlar

Doğrudan tarayıcıda yerleşik olarak bulunan çok sayıda kullanışlı akış vardır. Burada kolayca Bir blob'dan ReadableStream. Blob arayüzün stream() yöntemi okuma sonrasında blob'da bulunan verileri döndüren bir ReadableStream. Ayrıca, File nesnesi, Blob gibidir ve bir blob'un yapabileceği herhangi bir bağlamda kullanılabilir.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode() ve TextEncoder.encode() akış varyantlarının adı TextDecoderStream ve Sırasıyla TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Sıkıştırılmış dosyaları sıkıştırmak veya açmak için CompressionStream ve DecompressionStream akışları dönüştürme tıklayın. Aşağıdaki kod örneğinde Akışlar özelliğini nasıl indirebileceğinizi, nasıl sıkıştıracağınızı (gzip) doğrudan tarayıcı üzerinde yazar ve sıkıştırılmış dosyayı doğrudan diske yazarsınız.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API'nin FileSystemWritableFileStream ve deneysel fetch() istek akışları örnek olarak verilebilir.

Serial API, hem okunabilir hem de yazılabilir akışları yoğun bir şekilde kullanır.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Son olarak, WebSocketStream API, akışları WebSocket API ile entegre eder.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Faydalı kaynaklar

Teşekkür

Bu makale tarafından incelendi Jake Archibald, Faruk Beaufort, Sami Yılmaz, Mattias Buelens, Surma, Joe Podley ve Adam Rıza. Jake Archibald'ın blog yayınları, programın hakkında akışlar. Bazı kod örnekleri GitHub kullanıcısından esinlenmiştir @bellbind adlı kullanıcının keşifleri ve bu düzyazının bazı kısımlarında Akışlarda MDN Web Dokümanları. İlgili içeriği oluşturmak için kullanılan Streams Standard'ın yazarlar şu konuda çok önemli bir yere sahip: yazmalısınız. Ryan Lara tarafından oluşturulan hero resim Lansmanı kaldırın.