流是一個重要的抽象,用於處理二進制數據,無需一次性將所有數據加載到內存中。它們通常用於讀寫文件、發送和接收網絡請求以及處理大量數據。
Bun 實現了 Web API ReadableStream 和 WritableStream。
NOTE
Bun 還實現了 `node:stream` 模塊,包括 [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams)、 [`Writable`](https://nodejs.org/api/stream.html#stream_writable_streams) 和 [`Duplex`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams)。完整文檔請參考 [Node.js 文檔](https://nodejs.org/api/stream.html)。要創建簡單的 ReadableStream:
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});ReadableStream 的內容可以使用 for await 語法逐塊讀取。
for await (const chunk of stream) {
console.log(chunk);
}
// hello
// world直接 ReadableStream
Bun 實現了優化版本的 ReadableStream,避免不必要的數據復制和隊列管理邏輯。
使用傳統的 ReadableStream 時,數據塊被_入隊_。每個塊被復制到隊列中,直到流准備好發送更多數據之前一直坐在那裡。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});使用直接 ReadableStream 時,數據塊直接寫入流。不會發生隊列,也不需要克隆塊數據到內存中。controller API 已更新以反映這一點;你調用 .write 而不是 .enqueue()。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});使用直接 ReadableStream 時,所有塊隊列由目的地處理。流的消費者接收傳遞給 controller.write() 的確切內容,無需任何編碼或修改。
異步生成器流
Bun 還支持異步生成器函數作為 Response 和 Request 的源。這是創建從異步源獲取數據的 ReadableStream 的簡單方法。
const response = new Response(
(async function* () {
yield "hello";
yield "world";
})(),
);
await response.text(); // "helloworld"你也可以直接使用 [Symbol.asyncIterator]。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"如果你需要對流進行更細粒度的控制,yield 將返回直接 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"Bun.ArrayBufferSink
Bun.ArrayBufferSink 類是用於構建未知大小的 ArrayBuffer 的快速增量寫入器。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]要改為將數據作為 Uint8Array 檢索,傳遞 asUint8Array 選項給 start 方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ].write() 方法支持字符串、類型化數組、ArrayBuffer 和 SharedArrayBuffer。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();一旦調用 .end(),不能再將更多數據寫入 ArrayBufferSink。但是,在緩沖流的上下文中,連續寫入數據並定期 .flush() 內容(例如,到 WriteableStream)很有用。為支持這一點,傳遞 stream: true 給構造函數。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ].flush() 方法返回緩沖數據作為 ArrayBuffer(如果 asUint8Array: true 則為 Uint8Array)並清除內部緩沖區。
要手動設置內部緩沖區的大小(以字節為單位),傳遞 highWaterMark 值:
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});參考
/**
* 快速增量寫入器,在 end() 時成為 `ArrayBuffer`。
*/
export class ArrayBufferSink {
constructor();
start(options?: {
asUint8Array?: boolean;
/**
* 預分配此大小的內部緩沖區
* 當塊大小較小時,這可以顯著提高性能
*/
highWaterMark?: number;
/**
* 在 {@link ArrayBufferSink.flush} 上,將寫入的數據作為 `Uint8Array` 返回。
* 寫入將從緩沖區開頭重新開始。
*/
stream?: boolean;
}): void;
write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
/**
* 刷新內部緩沖區
*
* 如果 {@link ArrayBufferSink.start} 傳遞了 `stream` 選項,這將返回 `ArrayBuffer`
* 如果 {@link ArrayBufferSink.start} 傳遞了 `stream` 選項和 `asUint8Array`,這將返回 `Uint8Array`
* 否則,這將返回自上次刷新以來寫入的字節數
*
* 此 API 以後可能會更改為分離 Uint8ArraySink 和 ArrayBufferSink
*/
flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}