Bun.serve() 支持服務器端 WebSocket,具有即時壓縮、TLS 支持和 Bun 原生的發布 - 訂閱 API。
注意
⚡️ 7 倍以上的吞吐量
Bun 的 WebSocket 非常快。在 Linux x64 上的 簡單聊天室 中,Bun 可以處理比 Node.js + "ws" 多 7 倍的每秒請求數。
| 每秒發送消息數 | 運行時 | 客戶端數 |
|---|---|---|
| ~700,000 | (Bun.serve) Bun v0.2.1 (x64) | 16 |
| ~100,000 | (ws) Node v18.10.0 (x64) | 16 |
在內部,Bun 的 WebSocket 實現基於 uWebSockets。
啟動 WebSocket 服務器
下面是一個使用 Bun.serve 構建的簡單 WebSocket 服務器,其中所有傳入請求都在 fetch 處理程序中 升級 為 WebSocket 連接。套接字處理程序在 websocket 參數中聲明。
Bun.serve({
fetch(req, server) {
// 將請求升級為 WebSocket
if (server.upgrade(req)) {
return; // 不返回 Response
}
return new Response("升級失敗", { status: 500 });
},
websocket: {}, // 處理程序
});支持以下 WebSocket 事件處理程序:
Bun.serve({
fetch(req, server) {}, // 升級邏輯
websocket: {
message(ws, message) {}, // 收到消息
open(ws) {}, // 套接字打開
close(ws, code, message) {}, // 套接字關閉
drain(ws) {}, // 套接字准備好接收更多數據
},
});為速度設計的 API
在 Bun 中,處理程序每個服務器聲明一次,而不是每個套接字。
ServerWebSocket 期望你傳遞一個 WebSocketHandler 對象給 Bun.serve() 方法,該方法具有 open、message、close、drain 和 error 的方法。這與客戶端 WebSocket 類不同,後者擴展了 EventTarget(onmessage、onopen、onclose)。
客戶端通常沒有很多套接字連接打開,所以基於事件的 API 是有意義的。
但服務器往往有很多套接字連接打開,這意味著:
- 為每個連接添加/刪除事件監聽器所花費的時間會累積
- 為每個連接存儲回調函數引用的額外內存
- 通常,人們為每個連接創建新函數,這也意味著更多內存
因此,ServerWebSocket 不使用基於事件的 API,而是期望你在 Bun.serve() 中傳遞一個包含每個事件方法的單個對象,並為每個連接重用。
這導致更少的內存使用和更少的時間花費在添加/刪除事件監聽器上。
每個處理程序的第一個參數是處理事件的 ServerWebSocket 實例。ServerWebSocket 類是 WebSocket 的快速、Bun 原生實現,具有一些額外功能。
Bun.serve({
fetch(req, server) {}, // 升級邏輯
websocket: {
message(ws, message) {
ws.send(message); // 回顯消息
},
},
});發送消息
每個 ServerWebSocket 實例都有一個 .send() 方法用於向客戶端發送消息。它支持多種輸入類型。
Bun.serve({
fetch(req, server) {}, // 升級邏輯
websocket: {
message(ws, message) {
ws.send("Hello world"); // 字符串
ws.send(response.arrayBuffer()); // ArrayBuffer
ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
},
},
});頭
一旦升級成功,Bun 將根據 規范 發送 101 Switching Protocols 響應。可以在調用 server.upgrade() 時將額外的 headers 附加到此 Response。
Bun.serve({
fetch(req, server) {
const sessionId = await generateSessionId();
server.upgrade(req, {
headers: {
"Set-Cookie": `SessionId=${sessionId}`,
},
});
},
websocket: {}, // 處理程序
});上下文數據
可以在 .upgrade() 調用中將上下文 data 附加到新 WebSocket。此數據在 WebSocket 處理程序內的 ws.data 屬性上可用。
要強制類型化 ws.data,在 websocket 處理程序對象中添加 data 屬性。這會在所有生命周期鉤子中類型化 ws.data。
type WebSocketData = {
createdAt: number;
channelId: string;
authToken: string;
};
Bun.serve({
fetch(req, server) {
const cookies = new Bun.CookieMap(req.headers.get("cookie")!);
server.upgrade(req, {
// 此對象必須符合 WebSocketData
data: {
createdAt: Date.now(),
channelId: new URL(req.url).searchParams.get("channelId"),
authToken: cookies.get("X-Token"),
},
});
return undefined;
},
websocket: {
// TypeScript:像這樣指定 ws.data 的類型
data: {} as WebSocketData,
// 收到消息時調用的處理程序
async message(ws, message) {
// ws.data 現在已正確類型化為 WebSocketData
const user = getUserFromToken(ws.data.authToken);
await saveMessageToDatabase({
channel: ws.data.channelId,
message: String(message),
userId: user.id,
});
},
},
});注意
注意: 以前,你可以使用 Bun.serve 上的類型參數指定 ws.data 的類型,如 Bun.serve<MyData>({...})。由於 TypeScript 的限制,此模式已被刪除,而支持上面顯示的 data 屬性。
要從瀏覽器連接到此服務器,創建一個新的 WebSocket。
const socket = new WebSocket("ws://localhost:3000/chat");
socket.addEventListener("message", event => {
console.log(event.data);
});注意
識別用戶
當前設置在頁面上的 cookies 將隨 WebSocket 升級請求發送,並在 fetch 處理程序中的 req.headers 上可用。解析這些 cookies 以確定連接用戶的身份並相應地設置 data 的值。
發布/訂閱
Bun 的 ServerWebSocket 實現實現了用於基於主題廣播的原生發布 - 訂閱 API。單個套接字可以 .subscribe() 到主題(用字符串標識符指定)並向該主題的所有其他訂閱者(不包括自己).publish() 消息。這個基於主題的廣播 API 類似於 MQTT 和 Redis Pub/Sub。
const server = Bun.serve({
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/chat") {
console.log(`升級!`);
const username = getUsernameFromReq(req);
const success = server.upgrade(req, { data: { username } });
return success ? undefined : new Response("WebSocket 升級錯誤", { status: 400 });
}
return new Response("Hello world");
},
websocket: {
// TypeScript:像這樣指定 ws.data 的類型
data: {} as { username: string },
open(ws) {
const msg = `${ws.data.username} 已進入聊天`;
ws.subscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
message(ws, message) {
// 這是群聊
// 所以服務器重新廣播傳入的消息給每個人
server.publish("the-group-chat", `${ws.data.username}: ${message}`);
// 檢查當前訂閱
console.log(ws.subscriptions); // ["the-group-chat"]
},
close(ws) {
const msg = `${ws.data.username} 已離開聊天`;
ws.unsubscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
},
});
console.log(`監聽 ${server.hostname}:${server.port}`);調用 .publish(data) 會將消息發送給主題的所有訂閱者,除了 調用 .publish() 的套接字。要向主題的所有訂閱者發送消息,使用 Server 實例上的 .publish() 方法。
const server = Bun.serve({
websocket: {
// ...
},
});
// 監聽一些外部事件
server.publish("the-group-chat", "Hello world");壓縮
每消息 壓縮 可以通過 perMessageDeflate 參數啟用。
Bun.serve({
websocket: {
perMessageDeflate: true,
},
});可以通過將 boolean 作為第二個參數傳遞給 .send() 來為單個消息啟用壓縮。
ws.send("Hello world", true);有關壓縮特性的細粒度控制,請參閱 參考。
背壓
ServerWebSocket 的 .send(message) 方法返回一個 number 表示操作結果。
-1— 消息已入隊但存在背壓0— 由於連接問題消息被丟棄1+— 發送的字節數
這讓你更好地控制服務器中的背壓。
超時和限制
默認情況下,如果 WebSocket 連接空閒 120 秒,Bun 將關閉它。這可以通過 idleTimeout 參數配置。
Bun.serve({
fetch(req, server) {}, // 升級邏輯
websocket: {
idleTimeout: 60, // 60 秒
},
});如果 Bun 收到大於 16 MB 的消息,它也會關閉 WebSocket 連接。這可以通過 maxPayloadLength 參數配置。
Bun.serve({
fetch(req, server) {}, // 升級邏輯
websocket: {
maxPayloadLength: 1024 * 1024, // 1 MB
},
});連接到 WebSocket 服務器
Bun 實現了 WebSocket 類。要創建連接到 ws:// 或 wss:// 服務器的 WebSocket 客戶端,創建 WebSocket 實例,就像在瀏覽器中一樣。
const socket = new WebSocket("ws://localhost:3000");
// 使用子協議協商
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);在瀏覽器中,當前設置在頁面上的 cookies 將隨 WebSocket 升級請求發送。這是 WebSocket API 的標准功能。
為了方便,Bun 允許你直接在構造函數中設置自定義頭。這是 WebSocket 標准的 Bun 特定擴展。這在瀏覽器中不起作用。
const socket = new WebSocket("ws://localhost:3000", {
headers: {
/* 自定義頭 */
},
});要向套接字添加事件監聽器:
// 收到消息
socket.addEventListener("message", event => {});
// 套接字打開
socket.addEventListener("open", event => {});
// 套接字關閉
socket.addEventListener("close", event => {});
// 錯誤處理程序
socket.addEventListener("error", event => {});參考
namespace Bun {
export function serve(params: {
fetch: (req: Request, server: Server) => Response | Promise<Response>;
websocket?: {
message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
open?: (ws: ServerWebSocket) => void;
close?: (ws: ServerWebSocket, code: number, reason: string) => void;
error?: (ws: ServerWebSocket, error: Error) => void;
drain?: (ws: ServerWebSocket) => void;
maxPayloadLength?: number; // 默認:16 * 1024 * 1024 = 16 MB
idleTimeout?: number; // 默認:120(秒)
backpressureLimit?: number; // 默認:1024 * 1024 = 1 MB
closeOnBackpressureLimit?: boolean; // 默認:false
sendPings?: boolean; // 默認:true
publishToSelf?: boolean; // 默認:false
perMessageDeflate?:
| boolean
| {
compress?: boolean | Compressor;
decompress?: boolean | Compressor;
};
};
}): Server;
}
type Compressor =
| `"disable"`
| `"shared"`
| `"dedicated"`
| `"3KB"`
| `"4KB"`
| `"8KB"`
| `"16KB"`
| `"32KB"`
| `"64KB"`
| `"128KB"`
| `"256KB"`;
interface Server {
pendingWebSockets: number;
publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
upgrade(
req: Request,
options?: {
headers?: HeadersInit;
data?: any;
},
): boolean;
}
interface ServerWebSocket {
readonly data: any;
readonly readyState: number;
readonly remoteAddress: string;
readonly subscriptions: string[];
send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
close(code?: number, reason?: string): void;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
isSubscribed(topic: string): boolean;
cork(cb: (ws: ServerWebSocket) => void): void;
}