Skip to content

Gli stream sono un'astrazione importante per lavorare con dati binari senza caricarli tutti in memoria contemporaneamente. Sono comunemente usati per leggere e scrivere file, inviare e ricevere richieste di rete ed elaborare grandi quantità di dati.

Bun implementa le API Web ReadableStream e WritableStream.

NOTE

Bun implementa anche il modulo `node:stream`, inclusi [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams), [`Writable`](https://nodejs.org/api/stream.html#stream_writable_streams), e [`Duplex`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams). Per la documentazione completa, consulta la [documentazione Node.js](https://nodejs.org/api/stream.html).

Per creare un semplice ReadableStream:

ts
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

Il contenuto di un ReadableStream può essere letto chunk per chunk con la sintassi for await.

ts
for await (const chunk of stream) {
  console.log(chunk);
}

// hello
// world

ReadableStream Diretto

Bun implementa una versione ottimizzata di ReadableStream che evita copie di dati non necessarie e la logica di gestione della coda.

Con un ReadableStream tradizionale, i chunk di dati vengono accodati. Ogni chunk viene copiato in una coda, dove rimane fino a quando lo stream è pronto per inviare più dati.

ts
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

Con un ReadableStream diretto, i chunk di dati vengono scritti direttamente sullo stream. Non avviene accodamento e non c'è bisogno di clonare i dati del chunk in memoria. L'API del controller è aggiornata per riflettere questo; invece di .enqueue() chiami .write.

ts
const stream = new ReadableStream({
  type: "direct", 
  pull(controller) {
    controller.write("hello");
    controller.write("world");
  },
});

Quando usi un ReadableStream diretto, tutto l'accodamento dei chunk è gestito dalla destinazione. Il consumatore dello stream riceve esattamente ciò che viene passato a controller.write(), senza alcuna codifica o modifica.


Stream con generatori asincroni

Bun supporta anche le funzioni generatore asincrone come fonte per Response e Request. Questo è un modo semplice per creare un ReadableStream che recupera dati da una fonte asincrona.

ts
const response = new Response(
  (async function* () {
    yield "hello";
    yield "world";
  })(),
);

await response.text(); // "helloworld"

Puoi anche usare [Symbol.asyncIterator] direttamente.

ts
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    yield "hello";
    yield "world";
  },
});

await response.text(); // "helloworld"

Se hai bisogno di un controllo più granulare sullo stream, yield restituirà il controller del ReadableStream diretto.

ts
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    const controller = yield "hello";
    await controller.end();
  },
});

await response.text(); // "hello"

Bun.ArrayBufferSink

La classe Bun.ArrayBufferSink è un writer incrementale veloce per costruire un ArrayBuffer di dimensione sconosciuta.

ts
const sink = new Bun.ArrayBufferSink();

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]

Per recuperare i dati come Uint8Array, passa l'opzione asUint8Array al metodo start.

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  asUint8Array: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]

Il metodo .write() supporta stringhe, typed array, ArrayBuffer e SharedArrayBuffer.

ts
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);

sink.end();

Una volta chiamato .end(), non è possibile scrivere altri dati su ArrayBufferSink. Tuttavia, nel contesto del buffering di uno stream, è utile scrivere dati continuamente e periodicamente .flush() i contenuti (diciamo, in un WriteableStream). Per supportare questo, passa stream: true al costruttore.

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  stream: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]

sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]

Il metodo .flush() restituisce i dati bufferizzati come ArrayBuffer (o Uint8Array se asUint8Array: true) e pulisce il buffer interno.

Per impostare manualmente la dimensione del buffer interno in byte, passa un valore per highWaterMark:

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  highWaterMark: 1024 * 1024, // 1 MB
});

Riferimento

ts
/**
 * Fast incremental writer that becomes an `ArrayBuffer` on end().
 */
export class ArrayBufferSink {
  constructor();

  start(options?: {
    asUint8Array?: boolean;
    /**
     * Preallocate an internal buffer of this size
     * This can significantly improve performance when the chunk size is small
     */
    highWaterMark?: number;
    /**
     * On {@link ArrayBufferSink.flush}, return the written data as a `Uint8Array`.
     * Writes will restart from the beginning of the buffer.
     */
    stream?: boolean;
  }): void;

  write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
  /**
   * Flush the internal buffer
   *
   * If {@link ArrayBufferSink.start} was passed a `stream` option, this will return a `ArrayBuffer`
   * If {@link ArrayBufferSink.start} was passed a `stream` option and `asUint8Array`, this will return a `Uint8Array`
   * Otherwise, this will return the number of bytes written since the last flush
   *
   * This API might change later to separate Uint8ArraySink and ArrayBufferSink
   */
  flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
  end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}

Bun a cura di www.bunjs.com.cn