Skip to content

Streams sind eine wichtige Abstraktion für die Arbeit mit Binärdaten, ohne sie alle auf einmal in den Speicher zu laden. Sie werden häufig zum Lesen und Schreiben von Dateien, zum Senden und Empfangen von Netzwerkanfragen und zur Verarbeitung großer Datenmengen verwendet.

Bun implementiert die Web-APIs ReadableStream und WritableStream.

NOTE

Bun implementiert auch das `node:stream`-Modul, einschließlich [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams), [`Writable`](https://nodejs.org/api/stream.html#stream_writable_streams) und [`Duplex`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams). Für vollständige Dokumentation siehe die [Node.js-Dokumentation](https://nodejs.org/api/stream.html).

Um einen einfachen ReadableStream zu erstellen:

ts
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

Der Inhalt eines ReadableStream kann mit der for await-Syntax stückweise gelesen werden.

ts
for await (const chunk of stream) {
  console.log(chunk);
}

// hello
// world

Direkter ReadableStream

Bun implementiert eine optimierte Version von ReadableStream, die unnötiges Datenkopieren und Warteschlangen-Verwaltungslogik vermeidet.

Bei einem traditionellen ReadableStream werden Datenchunks in die Warteschlange gestellt. Jeder Chunk wird in eine Warteschlange kopiert, wo er verbleibt, bis der Stream bereit ist, mehr Daten zu senden.

ts
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

Bei einem direkten ReadableStream werden Datenchunks direkt in den Stream geschrieben. Es findet keine Warteschlangenbildung statt, und es ist nicht notwendig, die Chunk-Daten in den Speicher zu klonen. Die controller-API wurde entsprechend aktualisiert; anstelle von .enqueue() rufen Sie .write auf.

ts
const stream = new ReadableStream({
  type: "direct", 
  pull(controller) {
    controller.write("hello");
    controller.write("world");
  },
});

Bei Verwendung eines direkten ReadableStream wird die gesamte Chunk-Warteschlangenbildung vom Ziel übernommen. Der Konsument des Streams erhält genau das, was an controller.write() übergeben wird, ohne Kodierung oder Modifikation.


Async-Generator-Streams

Bun unterstützt auch Async-Generator-Funktionen als Quelle für Response und Request. Dies ist eine einfache Möglichkeit, einen ReadableStream zu erstellen, der Daten aus einer asynchronen Quelle abruft.

ts
const response = new Response(
  (async function* () {
    yield "hello";
    yield "world";
  })(),
);

await response.text(); // "helloworld"

Sie können auch [Symbol.asyncIterator] direkt verwenden.

ts
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    yield "hello";
    yield "world";
  },
});

await response.text(); // "helloworld"

Wenn Sie eine granularere Kontrolle über den Stream benötigen, gibt yield den direkten ReadableStream-Controller zurück.

ts
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    const controller = yield "hello";
    await controller.end();
  },
});

await response.text(); // "hello"

Bun.ArrayBufferSink

Die Bun.ArrayBufferSink-Klasse ist ein schneller inkrementeller Schreiber zum Erstellen eines ArrayBuffer unbekannter Größe.

ts
const sink = new Bun.ArrayBufferSink();

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]

Um die Daten stattdessen als Uint8Array abzurufen, übergeben Sie die asUint8Array-Option an die start-Methode.

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  asUint8Array: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]

Die .write()-Methode unterstützt Strings, Typed Arrays, ArrayBuffer und SharedArrayBuffer.

ts
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);

sink.end();

Sobald .end() aufgerufen wird, können keine weiteren Daten in den ArrayBufferSink geschrieben werden. Im Kontext des Pufferns eines Streams ist es jedoch nützlich, kontinuierlich Daten zu schreiben und den Inhalt regelmäßig mit .flush() zu leeren (z.B. in einen WriteableStream). Um dies zu unterstützen, übergeben Sie stream: true an den Konstruktor.

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  stream: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]

sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]

Die .flush()-Methode gibt die gepufferten Daten als ArrayBuffer (oder Uint8Array, wenn asUint8Array: true) zurück und leert den internen Puffer.

Um die Größe des internen Puffers in Bytes manuell festzulegen, übergeben Sie einen Wert für highWaterMark:

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  highWaterMark: 1024 * 1024, // 1 MB
});

Referenz

ts
/**
 * Schneller inkrementeller Schreiber, der bei end() zu einem `ArrayBuffer` wird.
 */
export class ArrayBufferSink {
  constructor();

  start(options?: {
    asUint8Array?: boolean;
    /**
     * Einen internen Puffer dieser Größe vorab zuweisen
     * Dies kann die Performance erheblich verbessern, wenn die Chunk-Größe klein ist
     */
    highWaterMark?: number;
    /**
     * Bei {@link ArrayBufferSink.flush} die geschriebenen Daten als `Uint8Array` zurückgeben.
     * Schreibvorgänge werden vom Anfang des Puffers neu gestartet.
     */
    stream?: boolean;
  }): void;

  write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
  /**
   * Den internen Puffer leeren
   *
   * Wenn {@link ArrayBufferSink.start} eine `stream`-Option übergeben wurde, wird dies ein `ArrayBuffer` zurückgeben
   * Wenn {@link ArrayBufferSink.start} eine `stream`-Option und `asUint8Array` übergeben wurde, wird dies eine `Uint8Array` zurückgeben
   * Andernfalls wird dies die Anzahl der seit dem letzten Flush geschriebenen Bytes zurückgeben
   *
   * Diese API könnte sich später ändern, um Uint8ArraySink und ArrayBufferSink zu trennen
   */
  flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
  end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}

Bun von www.bunjs.com.cn bearbeitet