Потоки являются важной абстракцией для работы с бинарными данными без загрузки их всех в память одновременно. Они обычно используются для чтения и записи файлов, отправки и получения сетевых запросов и обработки больших объемов данных.
Bun реализует Web API ReadableStream и WritableStream.
NOTE
Bun также реализует модуль `node:stream`, включая [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams), [`Writable`](https://nodejs.org/api/stream.html#stream_writable_streams) и [`Duplex`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams). Для полной документации обратитесь к [документации Node.js](https://nodejs.org/api/stream.html).Для создания простого ReadableStream:
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});Содержимое ReadableStream можно читать по частям с помощью синтаксиса for await.
for await (const chunk of stream) {
console.log(chunk);
}
// hello
// worldПрямой ReadableStream
Bun реализует оптимизированную версию ReadableStream, которая избегает ненужного копирования данных и логики управления очередью.
В традиционном ReadableStream части данных поступают в очередь. Каждая часть копируется в очередь, где она находится, пока поток не будет готов отправить больше данных.
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});В прямом ReadableStream части данных записываются напрямую в поток. Очереди не происходит, и нет необходимости клонировать данные части в память. API controller обновлен, чтобы отразить это; вместо .enqueue() вы вызываете .write.
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});При использовании прямого ReadableStream вся очередь частей обрабатывается назначением. Потребитель потока получает именно то, что передано в controller.write(), без кодирования или модификации.
Потоки асинхронных генераторов
Bun также поддерживает функции асинхронных генераторов как источник для Response и Request. Это простой способ создать ReadableStream, который получает данные из асинхронного источника.
const response = new Response(
(async function* () {
yield "hello";
yield "world";
})(),
);
await response.text(); // "helloworld"Вы также можете использовать [Symbol.asyncIterator] напрямую.
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"Если вам нужен более детальный контроль над потоком, yield вернет контроллер прямого ReadableStream.
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"Bun.ArrayBufferSink
Класс Bun.ArrayBufferSink — это быстрая инкрементальная запись для создания ArrayBuffer неизвестного размера.
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]Чтобы вместо этого получить данные как Uint8Array, передайте опцию asUint8Array в метод start.
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]Метод .write() поддерживает строки, типизированные массивы, ArrayBuffer и SharedArrayBuffer.
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();После вызова .end() больше никаких данных нельзя записать в ArrayBufferSink. Однако в контексте буферизации потока полезно непрерывно записывать данные и периодически вызывать .flush() содержимого (например, в WriteableStream). Для поддержки этого передайте stream: true в конструктор.
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]Метод .flush() возвращает буферизованные данные как ArrayBuffer (или Uint8Array, если asUint8Array: true) и очищает внутренний буфер.
Чтобы вручную установить размер внутреннего буфера в байтах, передайте значение для highWaterMark:
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});Справочник
/**
* Быстрая инкрементальная запись, которая становится `ArrayBuffer` при end().
*/
export class ArrayBufferSink {
constructor();
start(options?: {
asUint8Array?: boolean;
/**
* Предварительно выделить внутренний буфер такого размера
* Это может значительно улучшить производительность, когда размер части мал
*/
highWaterMark?: number;
/**
* При {@link ArrayBufferSink.flush} возвращает записанные данные как `Uint8Array`.
* Записи начнутся с начала буфера.
*/
stream?: boolean;
}): void;
write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
/**
* Сбросить внутренний буфер
*
* Если {@link ArrayBufferSink.start} была передана опция `stream`, это вернет `ArrayBuffer`
* Если {@link ArrayBufferSink.start} была передана опция `stream` и `asUint8Array`, это вернет `Uint8Array`
* В противном случае это вернет количество байт, записанных с последнего сброса
*
* Этот API может измениться позже для разделения Uint8ArraySink и ArrayBufferSink
*/
flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}