流是一个重要的抽象,用于处理二进制数据,无需一次性将所有数据加载到内存中。它们通常用于读写文件、发送和接收网络请求以及处理大量数据。
Bun 实现了 Web API ReadableStream 和 WritableStream。
NOTE
Bun 还实现了 `node:stream` 模块,包括 [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams)、 [`Writable`](https://nodejs.org/api/stream.html#stream_writable_streams) 和 [`Duplex`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams)。完整文档请参考 [Node.js 文档](https://nodejs.org/api/stream.html)。要创建简单的 ReadableStream:
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});ReadableStream 的内容可以使用 for await 语法逐块读取。
for await (const chunk of stream) {
console.log(chunk);
}
// hello
// world直接 ReadableStream
Bun 实现了优化版本的 ReadableStream,避免不必要的数据复制和队列管理逻辑。
使用传统的 ReadableStream 时,数据块被_入队_。每个块被复制到队列中,直到流准备好发送更多数据之前一直坐在那里。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});使用直接 ReadableStream 时,数据块直接写入流。不会发生队列,也不需要克隆块数据到内存中。controller API 已更新以反映这一点;你调用 .write 而不是 .enqueue()。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});使用直接 ReadableStream 时,所有块队列由目的地处理。流的消费者接收传递给 controller.write() 的确切内容,无需任何编码或修改。
异步生成器流
Bun 还支持异步生成器函数作为 Response 和 Request 的源。这是创建从异步源获取数据的 ReadableStream 的简单方法。
const response = new Response(
(async function* () {
yield "hello";
yield "world";
})(),
);
await response.text(); // "helloworld"你也可以直接使用 [Symbol.asyncIterator]。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"如果你需要对流进行更细粒度的控制,yield 将返回直接 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"Bun.ArrayBufferSink
Bun.ArrayBufferSink 类是用于构建未知大小的 ArrayBuffer 的快速增量写入器。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]要改为将数据作为 Uint8Array 检索,传递 asUint8Array 选项给 start 方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ].write() 方法支持字符串、类型化数组、ArrayBuffer 和 SharedArrayBuffer。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();一旦调用 .end(),不能再将更多数据写入 ArrayBufferSink。但是,在缓冲流的上下文中,连续写入数据并定期 .flush() 内容(例如,到 WriteableStream)很有用。为支持这一点,传递 stream: true 给构造函数。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ].flush() 方法返回缓冲数据作为 ArrayBuffer(如果 asUint8Array: true 则为 Uint8Array)并清除内部缓冲区。
要手动设置内部缓冲区的大小(以字节为单位),传递 highWaterMark 值:
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});参考
/**
* 快速增量写入器,在 end() 时成为 `ArrayBuffer`。
*/
export class ArrayBufferSink {
constructor();
start(options?: {
asUint8Array?: boolean;
/**
* 预分配此大小的内部缓冲区
* 当块大小较小时,这可以显著提高性能
*/
highWaterMark?: number;
/**
* 在 {@link ArrayBufferSink.flush} 上,将写入的数据作为 `Uint8Array` 返回。
* 写入将从缓冲区开头重新开始。
*/
stream?: boolean;
}): void;
write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
/**
* 刷新内部缓冲区
*
* 如果 {@link ArrayBufferSink.start} 传递了 `stream` 选项,这将返回 `ArrayBuffer`
* 如果 {@link ArrayBufferSink.start} 传递了 `stream` 选项和 `asUint8Array`,这将返回 `Uint8Array`
* 否则,这将返回自上次刷新以来写入的字节数
*
* 此 API 以后可能会更改为分离 Uint8ArraySink 和 ArrayBufferSink
*/
flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}