Skip to content

Потоки являются важной абстракцией для работы с бинарными данными без загрузки их всех в память одновременно. Они обычно используются для чтения и записи файлов, отправки и получения сетевых запросов и обработки больших объемов данных.

Bun реализует Web API ReadableStream и WritableStream.

NOTE

Bun также реализует модуль `node:stream`, включая [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams), [`Writable`](https://nodejs.org/api/stream.html#stream_writable_streams) и [`Duplex`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams). Для полной документации обратитесь к [документации Node.js](https://nodejs.org/api/stream.html).

Для создания простого ReadableStream:

ts
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

Содержимое ReadableStream можно читать по частям с помощью синтаксиса for await.

ts
for await (const chunk of stream) {
  console.log(chunk);
}

// hello
// world

Прямой ReadableStream

Bun реализует оптимизированную версию ReadableStream, которая избегает ненужного копирования данных и логики управления очередью.

В традиционном ReadableStream части данных поступают в очередь. Каждая часть копируется в очередь, где она находится, пока поток не будет готов отправить больше данных.

ts
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

В прямом ReadableStream части данных записываются напрямую в поток. Очереди не происходит, и нет необходимости клонировать данные части в память. API controller обновлен, чтобы отразить это; вместо .enqueue() вы вызываете .write.

ts
const stream = new ReadableStream({
  type: "direct", 
  pull(controller) {
    controller.write("hello");
    controller.write("world");
  },
});

При использовании прямого ReadableStream вся очередь частей обрабатывается назначением. Потребитель потока получает именно то, что передано в controller.write(), без кодирования или модификации.


Потоки асинхронных генераторов

Bun также поддерживает функции асинхронных генераторов как источник для Response и Request. Это простой способ создать ReadableStream, который получает данные из асинхронного источника.

ts
const response = new Response(
  (async function* () {
    yield "hello";
    yield "world";
  })(),
);

await response.text(); // "helloworld"

Вы также можете использовать [Symbol.asyncIterator] напрямую.

ts
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    yield "hello";
    yield "world";
  },
});

await response.text(); // "helloworld"

Если вам нужен более детальный контроль над потоком, yield вернет контроллер прямого ReadableStream.

ts
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    const controller = yield "hello";
    await controller.end();
  },
});

await response.text(); // "hello"

Bun.ArrayBufferSink

Класс Bun.ArrayBufferSink — это быстрая инкрементальная запись для создания ArrayBuffer неизвестного размера.

ts
const sink = new Bun.ArrayBufferSink();

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]

Чтобы вместо этого получить данные как Uint8Array, передайте опцию asUint8Array в метод start.

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  asUint8Array: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]

Метод .write() поддерживает строки, типизированные массивы, ArrayBuffer и SharedArrayBuffer.

ts
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);

sink.end();

После вызова .end() больше никаких данных нельзя записать в ArrayBufferSink. Однако в контексте буферизации потока полезно непрерывно записывать данные и периодически вызывать .flush() содержимого (например, в WriteableStream). Для поддержки этого передайте stream: true в конструктор.

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  stream: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]

sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]

Метод .flush() возвращает буферизованные данные как ArrayBuffer (или Uint8Array, если asUint8Array: true) и очищает внутренний буфер.

Чтобы вручную установить размер внутреннего буфера в байтах, передайте значение для highWaterMark:

ts
const sink = new Bun.ArrayBufferSink();
sink.start({
  highWaterMark: 1024 * 1024, // 1 MB
});

Справочник

ts
/**
 * Быстрая инкрементальная запись, которая становится `ArrayBuffer` при end().
 */
export class ArrayBufferSink {
  constructor();

  start(options?: {
    asUint8Array?: boolean;
    /**
     * Предварительно выделить внутренний буфер такого размера
     * Это может значительно улучшить производительность, когда размер части мал
     */
    highWaterMark?: number;
    /**
     * При {@link ArrayBufferSink.flush} возвращает записанные данные как `Uint8Array`.
     * Записи начнутся с начала буфера.
     */
    stream?: boolean;
  }): void;

  write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
  /**
   * Сбросить внутренний буфер
   *
   * Если {@link ArrayBufferSink.start} была передана опция `stream`, это вернет `ArrayBuffer`
   * Если {@link ArrayBufferSink.start} была передана опция `stream` и `asUint8Array`, это вернет `Uint8Array`
   * В противном случае это вернет количество байт, записанных с последнего сброса
   *
   * Этот API может измениться позже для разделения Uint8ArraySink и ArrayBufferSink
   */
  flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
  end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}

Bun от www.bunjs.com.cn