Bun.serve() 支持服务器端 WebSocket,具有即时压缩、TLS 支持和 Bun 原生的发布 - 订阅 API。
注意
⚡️ 7 倍以上的吞吐量
Bun 的 WebSocket 非常快。在 Linux x64 上的 简单聊天室 中,Bun 可以处理比 Node.js + "ws" 多 7 倍的每秒请求数。
| 每秒发送消息数 | 运行时 | 客户端数 |
|---|---|---|
| ~700,000 | (Bun.serve) Bun v0.2.1 (x64) | 16 |
| ~100,000 | (ws) Node v18.10.0 (x64) | 16 |
在内部,Bun 的 WebSocket 实现基于 uWebSockets。
启动 WebSocket 服务器
下面是一个使用 Bun.serve 构建的简单 WebSocket 服务器,其中所有传入请求都在 fetch 处理程序中 升级 为 WebSocket 连接。套接字处理程序在 websocket 参数中声明。
Bun.serve({
fetch(req, server) {
// 将请求升级为 WebSocket
if (server.upgrade(req)) {
return; // 不返回 Response
}
return new Response("升级失败", { status: 500 });
},
websocket: {}, // 处理程序
});支持以下 WebSocket 事件处理程序:
Bun.serve({
fetch(req, server) {}, // 升级逻辑
websocket: {
message(ws, message) {}, // 收到消息
open(ws) {}, // 套接字打开
close(ws, code, message) {}, // 套接字关闭
drain(ws) {}, // 套接字准备好接收更多数据
},
});为速度设计的 API
在 Bun 中,处理程序每个服务器声明一次,而不是每个套接字。
ServerWebSocket 期望你传递一个 WebSocketHandler 对象给 Bun.serve() 方法,该方法具有 open、message、close、drain 和 error 的方法。这与客户端 WebSocket 类不同,后者扩展了 EventTarget(onmessage、onopen、onclose)。
客户端通常没有很多套接字连接打开,所以基于事件的 API 是有意义的。
但服务器往往有很多套接字连接打开,这意味着:
- 为每个连接添加/删除事件监听器所花费的时间会累积
- 为每个连接存储回调函数引用的额外内存
- 通常,人们为每个连接创建新函数,这也意味着更多内存
因此,ServerWebSocket 不使用基于事件的 API,而是期望你在 Bun.serve() 中传递一个包含每个事件方法的单个对象,并为每个连接重用。
这导致更少的内存使用和更少的时间花费在添加/删除事件监听器上。
每个处理程序的第一个参数是处理事件的 ServerWebSocket 实例。ServerWebSocket 类是 WebSocket 的快速、Bun 原生实现,具有一些额外功能。
Bun.serve({
fetch(req, server) {}, // 升级逻辑
websocket: {
message(ws, message) {
ws.send(message); // 回显消息
},
},
});发送消息
每个 ServerWebSocket 实例都有一个 .send() 方法用于向客户端发送消息。它支持多种输入类型。
Bun.serve({
fetch(req, server) {}, // 升级逻辑
websocket: {
message(ws, message) {
ws.send("Hello world"); // 字符串
ws.send(response.arrayBuffer()); // ArrayBuffer
ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
},
},
});头
一旦升级成功,Bun 将根据 规范 发送 101 Switching Protocols 响应。可以在调用 server.upgrade() 时将额外的 headers 附加到此 Response。
Bun.serve({
fetch(req, server) {
const sessionId = await generateSessionId();
server.upgrade(req, {
headers: {
"Set-Cookie": `SessionId=${sessionId}`,
},
});
},
websocket: {}, // 处理程序
});上下文数据
可以在 .upgrade() 调用中将上下文 data 附加到新 WebSocket。此数据在 WebSocket 处理程序内的 ws.data 属性上可用。
要强制类型化 ws.data,在 websocket 处理程序对象中添加 data 属性。这会在所有生命周期钩子中类型化 ws.data。
type WebSocketData = {
createdAt: number;
channelId: string;
authToken: string;
};
Bun.serve({
fetch(req, server) {
const cookies = new Bun.CookieMap(req.headers.get("cookie")!);
server.upgrade(req, {
// 此对象必须符合 WebSocketData
data: {
createdAt: Date.now(),
channelId: new URL(req.url).searchParams.get("channelId"),
authToken: cookies.get("X-Token"),
},
});
return undefined;
},
websocket: {
// TypeScript:像这样指定 ws.data 的类型
data: {} as WebSocketData,
// 收到消息时调用的处理程序
async message(ws, message) {
// ws.data 现在已正确类型化为 WebSocketData
const user = getUserFromToken(ws.data.authToken);
await saveMessageToDatabase({
channel: ws.data.channelId,
message: String(message),
userId: user.id,
});
},
},
});注意
注意: 以前,你可以使用 Bun.serve 上的类型参数指定 ws.data 的类型,如 Bun.serve<MyData>({...})。由于 TypeScript 的限制,此模式已被删除,而支持上面显示的 data 属性。
要从浏览器连接到此服务器,创建一个新的 WebSocket。
const socket = new WebSocket("ws://localhost:3000/chat");
socket.addEventListener("message", event => {
console.log(event.data);
});注意
识别用户
当前设置在页面上的 cookies 将随 WebSocket 升级请求发送,并在 fetch 处理程序中的 req.headers 上可用。解析这些 cookies 以确定连接用户的身份并相应地设置 data 的值。
发布/订阅
Bun 的 ServerWebSocket 实现实现了用于基于主题广播的原生发布 - 订阅 API。单个套接字可以 .subscribe() 到主题(用字符串标识符指定)并向该主题的所有其他订阅者(不包括自己).publish() 消息。这个基于主题的广播 API 类似于 MQTT 和 Redis Pub/Sub。
const server = Bun.serve({
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/chat") {
console.log(`升级!`);
const username = getUsernameFromReq(req);
const success = server.upgrade(req, { data: { username } });
return success ? undefined : new Response("WebSocket 升级错误", { status: 400 });
}
return new Response("Hello world");
},
websocket: {
// TypeScript:像这样指定 ws.data 的类型
data: {} as { username: string },
open(ws) {
const msg = `${ws.data.username} 已进入聊天`;
ws.subscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
message(ws, message) {
// 这是群聊
// 所以服务器重新广播传入的消息给每个人
server.publish("the-group-chat", `${ws.data.username}: ${message}`);
// 检查当前订阅
console.log(ws.subscriptions); // ["the-group-chat"]
},
close(ws) {
const msg = `${ws.data.username} 已离开聊天`;
ws.unsubscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
},
});
console.log(`监听 ${server.hostname}:${server.port}`);调用 .publish(data) 会将消息发送给主题的所有订阅者,除了 调用 .publish() 的套接字。要向主题的所有订阅者发送消息,使用 Server 实例上的 .publish() 方法。
const server = Bun.serve({
websocket: {
// ...
},
});
// 监听一些外部事件
server.publish("the-group-chat", "Hello world");压缩
每消息 压缩 可以通过 perMessageDeflate 参数启用。
Bun.serve({
websocket: {
perMessageDeflate: true,
},
});可以通过将 boolean 作为第二个参数传递给 .send() 来为单个消息启用压缩。
ws.send("Hello world", true);有关压缩特性的细粒度控制,请参阅 参考。
背压
ServerWebSocket 的 .send(message) 方法返回一个 number 表示操作结果。
-1— 消息已入队但存在背压0— 由于连接问题消息被丢弃1+— 发送的字节数
这让你更好地控制服务器中的背压。
超时和限制
默认情况下,如果 WebSocket 连接空闲 120 秒,Bun 将关闭它。这可以通过 idleTimeout 参数配置。
Bun.serve({
fetch(req, server) {}, // 升级逻辑
websocket: {
idleTimeout: 60, // 60 秒
},
});如果 Bun 收到大于 16 MB 的消息,它也会关闭 WebSocket 连接。这可以通过 maxPayloadLength 参数配置。
Bun.serve({
fetch(req, server) {}, // 升级逻辑
websocket: {
maxPayloadLength: 1024 * 1024, // 1 MB
},
});连接到 WebSocket 服务器
Bun 实现了 WebSocket 类。要创建连接到 ws:// 或 wss:// 服务器的 WebSocket 客户端,创建 WebSocket 实例,就像在浏览器中一样。
const socket = new WebSocket("ws://localhost:3000");
// 使用子协议协商
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);在浏览器中,当前设置在页面上的 cookies 将随 WebSocket 升级请求发送。这是 WebSocket API 的标准功能。
为了方便,Bun 允许你直接在构造函数中设置自定义头。这是 WebSocket 标准的 Bun 特定扩展。这在浏览器中不起作用。
const socket = new WebSocket("ws://localhost:3000", {
headers: {
/* 自定义头 */
},
});要向套接字添加事件监听器:
// 收到消息
socket.addEventListener("message", event => {});
// 套接字打开
socket.addEventListener("open", event => {});
// 套接字关闭
socket.addEventListener("close", event => {});
// 错误处理程序
socket.addEventListener("error", event => {});参考
namespace Bun {
export function serve(params: {
fetch: (req: Request, server: Server) => Response | Promise<Response>;
websocket?: {
message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
open?: (ws: ServerWebSocket) => void;
close?: (ws: ServerWebSocket, code: number, reason: string) => void;
error?: (ws: ServerWebSocket, error: Error) => void;
drain?: (ws: ServerWebSocket) => void;
maxPayloadLength?: number; // 默认:16 * 1024 * 1024 = 16 MB
idleTimeout?: number; // 默认:120(秒)
backpressureLimit?: number; // 默认:1024 * 1024 = 1 MB
closeOnBackpressureLimit?: boolean; // 默认:false
sendPings?: boolean; // 默认:true
publishToSelf?: boolean; // 默认:false
perMessageDeflate?:
| boolean
| {
compress?: boolean | Compressor;
decompress?: boolean | Compressor;
};
};
}): Server;
}
type Compressor =
| `"disable"`
| `"shared"`
| `"dedicated"`
| `"3KB"`
| `"4KB"`
| `"8KB"`
| `"16KB"`
| `"32KB"`
| `"64KB"`
| `"128KB"`
| `"256KB"`;
interface Server {
pendingWebSockets: number;
publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
upgrade(
req: Request,
options?: {
headers?: HeadersInit;
data?: any;
},
): boolean;
}
interface ServerWebSocket {
readonly data: any;
readonly readyState: number;
readonly remoteAddress: string;
readonly subscriptions: string[];
send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
close(code?: number, reason?: string): void;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
isSubscribed(topic: string): boolean;
cork(cb: (ws: ServerWebSocket) => void): void;
}