Akışlar: Ayrıntılı kılavuz

Streams API ile okunabilir, yazılabilir ve dönüştürülebilir akışları nasıl kullanacağınızı öğrenin.

Streams API, ağ üzerinden alınan veya yerel olarak herhangi bir şekilde oluşturulan veri akışlarına programatik olarak erişmenize ve bunları JavaScript ile işlemenize olanak tanır. Akış, almak, göndermek veya dönüştürmek istediğiniz bir kaynağı küçük parçalara ayırmayı ve ardından bu parçaları bit bit işlemeyi içerir. Akış, web sayfalarında gösterilecek HTML veya video gibi öğeleri alan tarayıcıların zaten yaptığı bir işlem olsa da 2015'te akışlarla fetch kullanıma sunulmadan önce bu özellik JavaScript'te kullanılamamıştı.

Daha önce, bir tür kaynağı (ör. video veya metin dosyası) işlemek isterseniz dosyanın tamamını indirmeniz, uygun bir biçime serileştirilmesini beklemeniz ve ardından dosyayı işlemeniz gerekiyordu. JavaScript'in akışları kullanabilmesiyle tüm bunlar değişiyor. Artık istemcide kullanılabilir hale gelir gelmez ham verileri JavaScript ile kademeli olarak işleyebilir, arabelleğe, dizeye veya blob'a gerek kalmaz. Bu, bazılarını aşağıda listelediğim çeşitli kullanım alanlarına olanak tanır:

  • Video efektleri: Okunabilir bir video akışını, efektleri gerçek zamanlı olarak uygulayan bir dönüştürme akışı üzerinden aktarma.
  • Veri sıkıştırma/sıkıştırma açma: Bir dosya akışının, seçici bir şekilde sıkıştırma/sıkıştırma açma işlemi yapan bir dönüştürme akışı üzerinden aktarılması.
  • Resim kodunun çözülmesi: Bir HTTP yanıtı akışının, baytların kodunun bitmap verilerine dönüştürüldüğü bir dönüştürme akışı ve ardından bitmap'lerin PNG'lere dönüştürüldüğü başka bir dönüştürme akışı üzerinden aktarılması. Bir hizmet çalışanının fetch işleyicisine yüklenirse AVIF gibi yeni resim biçimlerini şeffaf bir şekilde doldurabilirsiniz.

Tarayıcı desteği

ReadableStream ve WritableStream

Tarayıcı desteği

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Kaynak

TransformStream

Tarayıcı desteği

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Kaynak

Temel kavramlar

Çeşitli yayın türleriyle ilgili ayrıntılara geçmeden önce bazı temel kavramları açıklamak isterim.

Büyük Parça

Bir parça, bir akışa yazılan veya bir akıştan okunan tek bir veri parçasıdır. Herhangi bir türde olabilir; hatta farklı türde parçalar içerebilir. Çoğu zaman, bir veri parçası belirli bir akış için en atomik veri birimi olmaz. Örneğin, bir bayt akışı tek baytlar yerine 16 KiB Uint8Array biriminden oluşan parçalar içerebilir.

Okunabilir akışlar

Okunabilir akış, okuyabileceğiniz bir veri kaynağını temsil eder. Diğer bir deyişle, veriler okunabilir bir akıştan çıkar. Daha açık belirtmek gerekirse, okunabilir akış, ReadableStream sınıfının bir örneğidir.

Yazılabilir akışlar

Yazılabilir akış, veri yazabileceğimiz bir hedefi temsil eder. Diğer bir deyişle, veriler yazılabilir bir akışa girer. Yazılabilir akış, WritableStream sınıfının bir örneğidir.

Akışları dönüştürme

Dönüşüm akışı, bir çift akıştan oluşur: Yazılabilir tarafı olarak bilinen yazılabilir bir akış ve okunabilir tarafı olarak bilinen okunabilir bir akış. Bu durumu gerçek dünyada bir metaforla açıklamak gerekirse, bir dilden diğerine anında çeviri yapan simultane çevirmen olarak örnek verilebilir. Dönüşüm akışına özgü bir şekilde, yazılabilir tarafa yazma işlemi, okunabilir taraftan okunmaya hazır yeni verilerin sunulmasına neden olur. Daha açık belirtmek gerekirse, writable ve readable özelliğine sahip tüm nesneler dönüşüm akışı olarak kullanılabilir. Ancak standart TransformStream sınıfı, düzgün bir şekilde dolaşık olan böyle bir çift oluşturmayı kolaylaştırır.

Boru zincirleri

Akışlar, öncelikle birbirlerine bağlantı verilerek kullanılır. Okunabilir bir akış, okunabilir akışın pipeTo() yöntemi kullanılarak doğrudan yazılabilir bir akışa aktarılabilir veya okunabilir akışın pipeThrough() yöntemi kullanılarak önce bir veya daha fazla dönüştürme akışı üzerinden aktarılabilir. Bu şekilde birleştirilmiş bir akış grubuna boru zinciri denir.

Geri basınç

Bir boru zinciri oluşturulduktan sonra, parçaların içinden ne kadar hızlı akması gerektiğine dair sinyaller yayılır. Zincirdeki herhangi bir adım henüz parçaları kabul edemiyorsa boru zinciri boyunca geriye doğru bir sinyal yayılır. Bu sinyal, sonunda orijinal kaynağa bu kadar hızlı parça üretmeyi bırakması söylenene kadar devam eder. Bu akış normalleştirme işlemine geri basınç denir.

Teeing

Okunabilir bir akış, tee() yöntemi kullanılarak yan dallara ayrılabilir (büyük harfli "T" şeklinden dolayı bu şekilde adlandırılır). Bu işlem, yayını kilitler, yani artık doğrudan kullanılamaz hale getirir. Ancak bağımsız olarak kullanılabilen iki yeni yayın (dal olarak adlandırılır) oluşturur. Akışlar geri sarılamadığı veya yeniden başlatılamadığı için başlangıç noktası da önemlidir. Bu konu hakkında daha fazla bilgiyi aşağıda bulabilirsiniz.

Fetch API'ye yapılan bir çağrıdan gelen ve daha sonra çıkışı ayrılan bir dönüştürme akışı üzerinden aktarılan ve ardından ilk okunabilir akış için tarayıcıya ve ikinci okunabilir akış için hizmet çalışanı önbelleğiye gönderilen okunabilir bir akıştan oluşan bir boru zinciri şeması.
Bir boru zinciri.

Okunabilir bir akış mekanizması

Okunabilir akış, JavaScript'te temel bir kaynaktan akan bir ReadableStream nesnesi tarafından temsil edilen bir veri kaynağıdır. ReadableStream() sınıfının kurucusu, belirtilen işleyicilerden okunabilir bir akış nesnesi oluşturur ve döndürür. İki tür temel kaynak vardır:

  • Aktarıcı kaynaklar, eriştiğinizde size sürekli olarak veri aktarır. Akışa erişimi başlatmak, duraklatmak veya iptal etmek size bağlıdır. Canlı video yayınları, sunucu tarafından gönderilen etkinlikler veya WebSocket'ler buna örnek gösterilebilir.
  • Alma kaynakları, bağlandıktan sonra bu kaynaklardan açıkça veri istemenizi gerektirir. Örnekler arasında fetch() veya XMLHttpRequest çağrıları aracılığıyla yapılan HTTP işlemleri yer alır.

Akış verileri, parça adı verilen küçük parçalar halinde sırayla okunur. Bir akışa yerleştirilen parçaların sıraya eklenmiş olduğu söylenir. Bu, okunmaya hazır olarak bir sırada bekledikleri anlamına gelir. Henüz okunmamış parçalar dahili bir kuyrukta tutulur.

Sıralama stratejisi, bir aktarımın dahili kuyruğunun durumuna göre geri basıncı nasıl bildirmesi gerektiğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.

Akıştaki parçalar bir okuyucu tarafından okunur. Bu okuyucu, verileri birer parça halinde alır ve üzerinde istediğiniz türde işlem yapmanıza olanak tanır. Okuyucu ve onunla birlikte gelen diğer işleme koduna tüketici denir.

Bu bağlamda bir sonraki yapıya denetleyici denir. Okunabilir her akış, adından da anlaşılacağı gibi akışı kontrol etmenize olanak tanıyan ilişkili bir kontrolöre sahiptir.

Bir akış aynı anda yalnızca bir okuyucu tarafından okunabilir. Bir okuyucu oluşturulduğunda ve bir akışı okumaya başladığında (yani etkin okuyucu olduğunda) akışa kilitlenir. Aktardığınız metni başka bir okuyucunun okumasını istiyorsanız genellikle başka bir işlem yapmadan önce ilk okuyucuyu bırakmanız gerekir (ancak aktarmaları bölebilirsiniz).

Okunabilir bir akış oluşturma

Oluşturucusunu çağırarak okunabilir bir akış oluşturursunuzReadableStream(). Oluşturucu, oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir bağımsız değişkene underlyingSource sahiptir.

underlyingSource

Bu işlem için geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemler kullanılabilir:

  • start(controller): Nesne oluşturulduğunda hemen çağrılır. Yöntem, akış kaynağına erişebilir ve akış işlevini ayarlamak için gereken her şeyi yapabilir. Bu işlem eşzamansız olarak yapılacaksa yöntem, başarı veya başarısızlığı bildirmek için bir promise döndürebilir. Bu yönteme iletilen controller parametresi bir ReadableStreamDefaultController bağımsız değişkenidir.
  • pull(controller): Daha fazla parça getirilirken yayını kontrol etmek için kullanılabilir. Akıştaki dahili parça kuyruğu dolu olmadığı sürece ve kuyruk en yüksek işaretine ulaşana kadar tekrar tekrar çağrılır. pull() çağrılmasının sonucu bir sözse söz yerine getirilene kadar pull() tekrar çağrılmaz. Sözleşme reddedilirse akışta hata oluşur.
  • cancel(reason): Akış tüketicisi akışı iptal ettiğinde çağrılır.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController aşağıdaki yöntemleri destekler:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy'dur. İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan bir nesnedir ve iki parametre alır:

  • highWaterMark: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.
  • size(chunk): Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygun ReadableStreamDefaultController.desiredSize mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağın pull() yönteminin ne zaman çağrılacağını da belirler.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() ve read() yöntemleri

Okunabilir bir akıştan okumak için bir okuyucuya ihtiyacınız vardır. Bu okuyucu ReadableStreamDefaultReader olacaktır. ReadableStream arayüzünün getReader() yöntemi bir okuyucu oluşturur ve akışı bu okuyucuya kilitler. Akış kilitliyken bu okuyucu serbest bırakılana kadar başka okuyucu edinilemez.

ReadableStreamDefaultReader arayüzünün read() yöntemi, yayının dahili kuyruğundaki sonraki parçaya erişim sağlayan bir promise döndürür. Akış durumuna bağlı olarak bir sonuçla isteği yerine getirir veya reddeder. Olasılıklar şunlardır:

  • Bir parça mevcutsa söz,
    { value: chunk, done: false } biçiminde bir nesneyle yerine getirilir.
  • Akış kapatılırsa söz,
    { value: undefined, done: true } biçiminde bir nesneyle yerine getirilir.
  • Akışta hata oluşursa söz konusu hata ile birlikte söz reddedilir.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked mülkü

Okunabilir bir aktarımın kilitli olup olmadığını, ReadableStream.locked mülküne erişerek kontrol edebilirsiniz.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Okunabilir akış kod örnekleri

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir. Öncelikle, underlyingSource bağımsız değişkeninde (yani TimestampSource sınıfında) bir start() yöntemi tanımlayan bir ReadableStream oluşturursunuz. Bu yöntem, yayının controller özelliğine on saniye boyunca her saniye bir zaman damgası enqueue() gönderir. Son olarak da akışa close() vermesini söyler. Bu akışı, getReader() yöntemi aracılığıyla bir okuyucu oluşturarak ve akış done olana kadar read()'u çağırarak tüketirsiniz.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Eşzamansız iterasyon

Her read() döngü iterasyonunda akış done olup olmadığını kontrol etmek en uygun API olmayabilir. Neyse ki yakında bunu yapmanın daha iyi bir yolu olacak: eşzamansız iterasyon.

for await (const chunk of stream) {
  console.log(chunk);
}

Eş zamansız iterasyonu bugün kullanmak için bir geçici çözüm, davranışı bir polyfill ile uygulamaktır.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Okunabilir bir akış oluşturma

ReadableStream arayüzünün tee() yöntemi, mevcut okunabilir akışı ayırarak yeni ReadableStream örnekleri olarak iki dal içeren iki öğeli bir dizi döndürür. Bu sayede iki okuyucu aynı anda bir yayını okuyabilir. Örneğin, sunucudan bir yanıt almak ve bunu tarayıcıya aktarmak, aynı zamanda hizmet çalışanı önbelleğine aktarmak istiyorsanız bunu bir hizmet çalışanında yapabilirsiniz. Yanıt gövdesi birden fazla kez kullanılamayacağından bunu yapmak için iki kopyaya ihtiyacınız vardır. Ardından, akışı iptal etmek için ortaya çıkan her iki dalı da iptal etmeniz gerekir. Bir yayını başlattığınızda yayın genellikle kilitlenir ve diğer okuyucular tarafından kilitlenmesi engellenir.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Okunabilir bayt akışları

Baytları temsil eden akışlar için, baytları verimli bir şekilde işlemek amacıyla (özellikle kopyaları en aza indirerek) okunabilir akışın genişletilmiş bir sürümü sağlanır. Bayt akışları, kendi arabelleğinizi getirme (BYOB) okuyucularının edinilmesine olanak tanır. Varsayılan uygulama, WebSocket'ler söz konusu olduğunda dize veya dizi arabellekleri gibi çeşitli farklı çıkışlar verebilir. Byte akışları ise bayt çıkışını garanti eder. Ayrıca BYOB okuyucuları kararlılık avantajlarından da yararlanabilir. Bunun nedeni, bir arabellek ayrılırsa aynı arabelleğe iki kez yazılmayacağının garanti edilebilmesi ve böylece yarış koşullarının önlenebilmesidir. BYOB okuyucular, arabellekleri yeniden kullanabildiği için tarayıcının çöp toplama işlemini çalıştırması gereken süreyi azaltabilir.

Okunabilir bir bayt akışı oluşturma

ReadableStream() oluşturucusuna ek bir type parametresi ileterek okunabilir bir bayt akışı oluşturabilirsiniz.

new ReadableStream({ type: 'bytes' });

underlyingSource

Okunabilir bir bayt akışının temel kaynağına, üzerinde işlem yapmak için bir ReadableByteStreamController verilir. ReadableByteStreamController.enqueue() yöntemi, değeri ArrayBufferView olan bir chunk bağımsız değişkeni alır. ReadableByteStreamController.byobRequest mülkü, mevcut BYOB çekme isteğini döndürür veya istekte bulunulmamışsa null değerini döndürür. Son olarak ReadableByteStreamController.desiredSize mülkü, kontrol edilen akıştaki dahili kuyruğu doldurmak için istenen boyutu döndürür.

queuingStrategy

ReadableStream() kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy'dur. İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan ve bir parametre alan bir nesnedir:

  • highWaterMark: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir bayt sayısı. Bu, geri basıncı belirlemek için kullanılır ve uygun ReadableByteStreamController.desiredSize mülkü aracılığıyla gösterilir. Ayrıca, temel kaynağın pull() yönteminin ne zaman çağrılacağını da belirler.

getReader() ve read() yöntemleri

Ardından, mode parametresini uygun şekilde ayarlayarak ReadableStreamBYOBReader'e erişebilirsiniz: ReadableStream.getReader({ mode: "byob" }). Bu sayede, kopyalardan kaçınmak için arabellek ayırma üzerinde daha hassas kontrol sağlanır. Bayt akışından okumak için ReadableStreamBYOBReader.read(view) işlevini çağırmanız gerekir. Burada view bir ArrayBufferView bağımsız değişkenidir.

Okunabilir bayt akışı kod örneği

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Aşağıdaki işlev, rastgele oluşturulmuş bir dizinin sıfır kopyalamayla verimli bir şekilde okunmasına olanak tanıyan okunabilir bayt akışları döndürür. Önceden belirlenmiş 1.024 boyutunda bir parça yerine geliştirici tarafından sağlanan arabelleği doldurmaya çalışır ve böylece tam kontrol sağlar.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Yazılabilir akışların işleyiş şekli

Yazılabilir akış, JavaScript'te WritableStream nesnesi ile temsil edilen, veri yazabileceğiniz bir hedeftir. Bu, ham verilerin yazıldığı alt düzey bir G/Ç havuzu olan temel havuzun üst kısmında bir soyutlama görevi görür.

Veriler, bir yazar aracılığıyla akışı tek tek parçalara yazar. Bir parça, tıpkı bir okuyucudaki parçalar gibi birçok şekilde olabilir. Yazmaya hazır parçalar oluşturmak için istediğiniz kodu kullanabilirsiniz. Yazıcı ve ilişkili koda üretici denir.

Bir yazar oluşturulduğunda ve bir akışa yazmaya başladığında (etkin yazar), akışa kilitlendiği söylenir. Yazılabilir bir akışa aynı anda yalnızca bir yazar yazabilir. Başka bir yazarın akışınıza yazmaya başlamasını istiyorsanız genellikle yayını yayınlamanız ve ardından başka bir yazar eklemeniz gerekir.

Dahili bir sıra, akışa yazılmış ancak henüz temel alıcı tarafından işlenmemiş parçaları izler.

Sıralama stratejisi, bir aktarımın dahili kuyruğunun durumuna göre geri basıncı nasıl bildirmesi gerektiğini belirleyen bir nesnedir. Sıralama stratejisi her bir parçaya bir boyut atar ve sıradaki tüm parçaların toplam boyutunu maksimum değer olarak bilinen belirli bir sayıyla karşılaştırır.

Nihai yapıya denetleyici adı verilir. Yazılabilir her akış, akışı kontrol etmenize (örneğin, iptal etmenize) olanak tanıyan ilişkili bir denetleyiciye sahiptir.

Yazılabilir akış oluşturma

Streams API'nin WritableStream arayüzü, aktarma noktası olarak bilinen bir hedefe aktarılan verileri yazmak için standart bir soyutlama sağlar. Bu nesne, yerleşik geri basınç ve sıraya alma özelliğine sahiptir. Oluşturucusunu WritableStream() çağırarak yazılabilir bir akış oluşturursunuz. Oluşturulan akış örneğinin nasıl davranacağını tanımlayan yöntem ve özelliklere sahip bir nesneyi temsil eden isteğe bağlı bir underlyingSink parametresi vardır.

underlyingSink

underlyingSink, geliştirici tarafından tanımlanan aşağıdaki isteğe bağlı yöntemleri içerebilir. Bazı yöntemlere iletilen controller parametresi bir WritableStreamDefaultController öğesidir.

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Bu yöntemin içeriği, temel lavaboya erişmeyi amaçlamalıdır. Bu işlem eşzamansız olarak yapılacaksa başarı veya başarısızlık durumunu bildiren bir promise döndürülebilir.
  • write(chunk, controller): Bu yöntem, yeni bir veri parçası (chunk parametresinde belirtilir) temel alıcıya yazılmaya hazır olduğunda çağrılır. Yazma işleminin başarılı veya başarısız olduğunu belirtmek için bir promise döndürebilir. Bu yöntem yalnızca önceki yazma işlemleri başarılı olduktan sonra çağrılır ve hiçbir zaman akış kapatıldıktan veya iptal edildikten sonra çağrılmaz.
  • close(controller): Uygulama, akışa parça yazma işlemini tamamladığını bildirirse bu yöntem çağrılır. İçerik, temel havuza yapılan yazma işlemlerini tamamlamak ve havuza erişimi serbest bırakmak için gereken her şeyi yapmalıdır. Bu işlem eşzamanlı değilse başarı veya başarısızlık sinyali vermek için bir promise döndürebilir. Bu yöntem yalnızca sıraya alınmış tüm yazma işlemleri başarılı olduktan sonra çağrılır.
  • abort(reason): Uygulama, akışı aniden kapatmak ve hatalı duruma geçirmek istediğini belirtirse bu yöntem çağrılır. close() gibi tutulan tüm kaynakları temizleyebilir ancak yazma işlemleri sıraya alınmış olsa bile abort() çağrılır. Bu parçalar atılır. Bu işlem eşzamanlı değilse başarı veya başarısızlık sinyali vermek için bir promise döndürebilir. reason parametresi, aktarımın neden iptal edildiğini açıklayan bir DOMString içerir.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API'nin WritableStreamDefaultController arabiriminde, yazma işlemi için daha fazla parça gönderilirken veya yazma işleminin sonunda WritableStream durumunun ayarlama sırasında kontrol edilmesine olanak tanıyan bir kontrolör bulunur. Bir WritableStream oluşturulurken, temel havuza, üzerinde işlem yapılacak karşılık gelen bir WritableStreamDefaultController örneği verilir. WritableStreamDefaultController yalnızca bir yönteme sahiptir: WritableStreamDefaultController.error(). Bu yöntem, ilişkili akışla gelecekteki etkileşimlerin hatayla sonuçlanmasına neden olur. WritableStreamDefaultController, AbortSignal örneği döndüren bir signal özelliğini de destekler. Bu özellik, gerektiğinde WritableStream işleminin durdurulmasına olanak tanır.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() kurucusunun ikinci, yine isteğe bağlı bağımsız değişkeni queuingStrategy'dur. İsteğe bağlı olarak akış için bir sıra stratejisi tanımlayan bir nesnedir ve iki parametre alır:

  • highWaterMark: Bu sıraya ekleme stratejisinin kullanıldığı yayının en yüksek noktasını gösteren sıfırdan büyük bir sayı.
  • size(chunk): Belirtilen parça değerinin sonlu ve negatif olmayan boyutunu hesaplayıp döndüren bir işlev. Sonuç, geri basıncı belirlemek için kullanılır ve uygun WritableStreamDefaultWriter.desiredSize mülkü aracılığıyla gösterilir.

getWriter() ve write() yöntemleri

Yazılabilir bir akışa yazmak için bir yazara (WritableStreamDefaultWriter) ihtiyacınız vardır. WritableStream arayüzünün getWriter() yöntemi, yeni bir WritableStreamDefaultWriter örneği döndürür ve akışı bu örneğe kilitler. Akış kilitliyken mevcut yazar serbest bırakılana kadar başka bir yazar edinilemez.

WritableStreamDefaultWriter arabiriminin write() yöntemi, iletilen bir veri kümesini bir WritableStream'e ve temelindeki havuza yazar, ardından yazma işleminin başarılı veya başarısız olduğunu belirten bir söz döndürür. "Başarılı" ifadesinin ne anlama geldiğinin temeldeki havuza bağlı olduğunu unutmayın. Bu ifade, parçanın kabul edildiğini belirtebilir ancak nihai hedefine güvenli bir şekilde kaydedildiğini göstermeyebilir.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked mülkü

Yazılabilir bir aktarımın kilitli olup olmadığını kontrol etmek için WritableStream.locked mülküne erişebilirsiniz.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Yazılabilir akış kod örneği

Aşağıdaki kod örneğinde tüm adımlar gösterilmektedir.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Okunabilir bir akışı, yazılabilir bir akışa aktarma

Okunabilir bir akış, okunabilir akışın pipeTo() yöntemi aracılığıyla yazılabilir bir akışa aktarılabilir. ReadableStream.pipeTo(), mevcut ReadableStream öğesini belirli bir WritableStream öğesine aktarır ve aktarma işlemi başarıyla tamamlandığında yerine getirilen veya herhangi bir hatayla karşılaşıldığında reddedilen bir promise döndürür.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Dönüşüm akışı oluşturma

Streams API'nin TransformStream arayüzü, dönüştürülebilir bir veri grubunu temsil eder. Oluşturucu TransformStream()'yi çağırarak bir dönüştürme akışı oluşturursunuz. Bu oluşturucu, belirtilen işleyicilerden bir dönüştürme akışı nesnesi oluşturup döndürür. TransformStream() kurucusu, ilk bağımsız değişkeni olarak transformer'yi temsil eden isteğe bağlı bir JavaScript nesnesi kabul eder. Bu tür nesneler aşağıdaki yöntemlerden herhangi birini içerebilir:

transformer

  • start(controller): Bu yöntem, nesne oluşturulduğunda hemen çağrılır. Genellikle controller.enqueue() kullanılarak ön ek parçalarını sıraya eklemek için kullanılır. Bu parçalar okunabilir taraftan okunur ancak yazılabilir tarafa yapılan yazma işlemlerine bağlı değildir. Bu ilk işlem, örneğin ön ek parçalarını elde etmek biraz çaba gerektirdiği için eşzamanlı değilse işlev, başarı veya başarısızlığı bildirmek için bir söz döndürebilir. Reddedilen bir söz, aktarımda hata oluşturur. Atılan tüm istisnalar, TransformStream() kurucusu tarafından yeniden atılır.
  • transform(chunk, controller): Bu yöntem, orijinal olarak yazılabilir tarafa yazılan yeni bir parça dönüştürülmeye hazır olduğunda çağrılır. Akış uygulaması, bu işlevin yalnızca önceki dönüştürme işlemleri başarılı olduktan sonra çağrılacağını ve hiçbir zaman start() tamamlanmadan önce veya flush() çağrıldıktan sonra çağrılmayacağını garanti eder. Bu işlev, dönüştürme akışının gerçek dönüşüm işlemini gerçekleştirir. Sonuçları controller.enqueue() kullanarak sıraya ekleyebilir. Bu, yazılabilir tarafa yazılan tek bir parçanın, controller.enqueue()'ün kaç kez çağrıldığına bağlı olarak okunabilir tarafta sıfır veya birden fazla parçayla sonuçlanmasını sağlar. Dönüşüm işlemi eşzamanlı değilse bu işlev, dönüşümün başarılı veya başarısız olduğunu bildiren bir promise döndürebilir. Reddedilen bir söz, dönüştürme akışının hem okunabilir hem de yazılabilir tarafında hata verir. Hiçbir transform() yöntemi sağlanmazsa kimlik dönüştürme kullanılır. Bu yöntem, yazılabilir taraftan okunabilir tarafa değişmeden parçaları ekler.
  • flush(controller): Bu yöntem, yazılabilir tarafa yazılan tüm parçalar transform() üzerinden başarıyla geçirilerek dönüştürüldükten ve yazılabilir taraf kapatılmak üzereyken çağrılır. Bu genellikle, son ek parçalarını okunabilir tarafa eklemek için kullanılır. Boşaltma işlemi eşzamanlı değilse işlev, başarı veya başarısızlığı belirtmek için bir promise döndürebilir. Sonuç, stream.writable.write() çağrısını yapan kullanıcıya iletilir. Ayrıca, reddedilen bir promise, aktarımın hem okunabilir hem de yazılabilir tarafında hata oluşturur. İstisna atma işlemi, reddedilen bir promise döndürmekle aynı şekilde değerlendirilir.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategy ve readableStrategy sıra stratejileri

TransformStream() kurucusunun ikinci ve üçüncü isteğe bağlı parametreleri, isteğe bağlı writableStrategy ve readableStrategy sıra stratejileridir. Bunlar sırasıyla okunur ve yazılabilir akış bölümlerinde belirtildiği şekilde tanımlanır.

Dönüşüm akışı kod örneği

Aşağıdaki kod örneğinde, basit bir dönüştürme akışının işleyişi gösterilmektedir.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Okunabilir bir akışı dönüştürme akışı üzerinden aktarma

ReadableStream arayüzünün pipeThrough() yöntemi, mevcut akışı bir dönüştürme akışı veya başka bir yazılabilir/okunabilir çift üzerinden aktarmanın zincirlenebilir bir yolunu sağlar. Bir akışı boruya aktarmak genellikle akışı boru boyunca kilitler ve diğer okuyucuların kilitlemesini önler.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Aşağıdaki kod örneğinde (biraz yapay), döndürülen yanıt vaadini bir akış olarak tüketerek ve parça parça büyük harf kullanarak tüm metni büyük harf yapan fetch() işlevinin "büyük harflerle yazma" sürümünü nasıl uygulayabileceğiniz gösterilmektedir. Bu yaklaşımın avantajı, dokümanın tamamının indirilmesini beklemeniz gerekmemesidir. Bu, büyük dosyalarla çalışırken büyük bir fark yaratabilir.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Aşağıdaki demoda, okunabilir, yazılabilir ve dönüştürme akışları gösterilmektedir. Ayrıca pipeThrough() ve pipeTo() boru zinciri örnekleri ve tee() gösterilmektedir. İsterseniz demoyu kendi penceresinde çalıştırabilir veya kaynak kodunu görüntüleyebilirsiniz.

Tarayıcıda kullanılabilen faydalı akışlar

Tarayıcıya yerleşik olarak birçok yararlı akış bulunur. Bir blob'dan kolayca ReadableStream oluşturabilirsiniz. Blob arayüzünün stream() yöntemi, okunması sonrasında blob içindeki verileri döndüren bir ReadableStream döndürür. Ayrıca, File nesnesinin belirli bir Blob türü olduğunu ve bir blob'un kullanılabildiği her bağlamda kullanılabileceğini unutmayın.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode() ve TextEncoder.encode()'un yayın varyantları sırasıyla TextDecoderStream ve TextEncoderStream olarak adlandırılır.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Sırasıyla CompressionStream ve DecompressionStream dönüştürme akışlarıyla dosyaları sıkıştırmak veya sıkıştırılmış dosyaları açmak kolaydır. Aşağıdaki kod örneğinde, Streams spesifikasyonunu nasıl indireceğiniz, doğrudan tarayıcıda nasıl sıkıştıracağınız (gzip) ve sıkıştırılmış dosyayı doğrudan diske nasıl yazacağınız gösterilmektedir.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API'nin FileSystemWritableFileStream ve deneysel fetch() istek akışları, kullanımdaki yazılabilir akışlara örnek gösterilebilir.

Serial API, hem okunabilir hem de yazılabilir akışları yoğun bir şekilde kullanır.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Son olarak WebSocketStream API, akışları WebSocket API ile entegre eder.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Faydalı kaynaklar

Teşekkür ederiz

Bu makale, Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley ve Adam Rice tarafından incelendi. Jake Archibald'ın blog yayınları, yayınları anlamama çok yardımcı oldu. Kod örneklerinin bazıları GitHub kullanıcısı @bellbind'in keşiflerinden esinlenmiştir ve metnin bazı bölümleri Akışlar hakkında MDN Web Dokümanları'na dayanır. Akışlar Standardı'nın yazarları bu spesifikasyonu yazarken çok iyi bir iş çıkarmıştır. Unsplash'tan Ryan Lara tarafından oluşturulan lokomotif resim.