Skip to content

Bun.serve() 支持服務器端 WebSocket,具有即時壓縮、TLS 支持和 Bun 原生的發布 - 訂閱 API。

注意

⚡️ 7 倍以上的吞吐量

Bun 的 WebSocket 非常快。在 Linux x64 上的 簡單聊天室 中,Bun 可以處理比 Node.js + "ws" 多 7 倍的每秒請求數。

每秒發送消息數運行時客戶端數
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16

在內部,Bun 的 WebSocket 實現基於 uWebSockets


啟動 WebSocket 服務器

下面是一個使用 Bun.serve 構建的簡單 WebSocket 服務器,其中所有傳入請求都在 fetch 處理程序中 升級 為 WebSocket 連接。套接字處理程序在 websocket 參數中聲明。

ts
Bun.serve({
  fetch(req, server) {
    // 將請求升級為 WebSocket
    if (server.upgrade(req)) {
      return; // 不返回 Response
    }
    return new Response("升級失敗", { status: 500 });
  },
  websocket: {}, // 處理程序
});

支持以下 WebSocket 事件處理程序:

ts
Bun.serve({
  fetch(req, server) {}, // 升級邏輯
  websocket: {
    message(ws, message) {}, // 收到消息
    open(ws) {}, // 套接字打開
    close(ws, code, message) {}, // 套接字關閉
    drain(ws) {}, // 套接字准備好接收更多數據
  },
});

為速度設計的 API

在 Bun 中,處理程序每個服務器聲明一次,而不是每個套接字。

ServerWebSocket 期望你傳遞一個 WebSocketHandler 對象給 Bun.serve() 方法,該方法具有 openmessageclosedrainerror 的方法。這與客戶端 WebSocket 類不同,後者擴展了 EventTarget(onmessage、onopen、onclose)。

客戶端通常沒有很多套接字連接打開,所以基於事件的 API 是有意義的。

但服務器往往有很多套接字連接打開,這意味著:

  • 為每個連接添加/刪除事件監聽器所花費的時間會累積
  • 為每個連接存儲回調函數引用的額外內存
  • 通常,人們為每個連接創建新函數,這也意味著更多內存

因此,ServerWebSocket 不使用基於事件的 API,而是期望你在 Bun.serve() 中傳遞一個包含每個事件方法的單個對象,並為每個連接重用。

這導致更少的內存使用和更少的時間花費在添加/刪除事件監聽器上。

每個處理程序的第一個參數是處理事件的 ServerWebSocket 實例。ServerWebSocket 類是 WebSocket 的快速、Bun 原生實現,具有一些額外功能。

ts
Bun.serve({
  fetch(req, server) {}, // 升級邏輯
  websocket: {
    message(ws, message) {
      ws.send(message); // 回顯消息
    },
  },
});

發送消息

每個 ServerWebSocket 實例都有一個 .send() 方法用於向客戶端發送消息。它支持多種輸入類型。

ts
Bun.serve({
  fetch(req, server) {}, // 升級邏輯
  websocket: {
    message(ws, message) {
      ws.send("Hello world"); // 字符串
      ws.send(response.arrayBuffer()); // ArrayBuffer
      ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
    },
  },
});

一旦升級成功,Bun 將根據 規范 發送 101 Switching Protocols 響應。可以在調用 server.upgrade() 時將額外的 headers 附加到此 Response

ts
Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: { 
        "Set-Cookie": `SessionId=${sessionId}`, 
      }, 
    });
  },
  websocket: {}, // 處理程序
});

上下文數據

可以在 .upgrade() 調用中將上下文 data 附加到新 WebSocket。此數據在 WebSocket 處理程序內的 ws.data 屬性上可用。

要強制類型化 ws.data,在 websocket 處理程序對象中添加 data 屬性。這會在所有生命周期鉤子中類型化 ws.data

ts
type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

Bun.serve({
  fetch(req, server) {
    const cookies = new Bun.CookieMap(req.headers.get("cookie")!);

    server.upgrade(req, {
      // 此對象必須符合 WebSocketData
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies.get("X-Token"),
      },
    });

    return undefined;
  },
  websocket: {
    // TypeScript:像這樣指定 ws.data 的類型
    data: {} as WebSocketData,
    // 收到消息時調用的處理程序
    async message(ws, message) {
      // ws.data 現在已正確類型化為 WebSocketData
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});

注意

注意: 以前,你可以使用 Bun.serve 上的類型參數指定 ws.data 的類型,如 Bun.serve<MyData>({...})。由於 TypeScript 的限制,此模式已被刪除,而支持上面顯示的 data 屬性。

要從瀏覽器連接到此服務器,創建一個新的 WebSocket

js
const socket = new WebSocket("ws://localhost:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
});

注意

識別用戶

當前設置在頁面上的 cookies 將隨 WebSocket 升級請求發送,並在 fetch 處理程序中的 req.headers 上可用。解析這些 cookies 以確定連接用戶的身份並相應地設置 data 的值。

發布/訂閱

Bun 的 ServerWebSocket 實現實現了用於基於主題廣播的原生發布 - 訂閱 API。單個套接字可以 .subscribe() 到主題(用字符串標識符指定)並向該主題的所有其他訂閱者(不包括自己).publish() 消息。這個基於主題的廣播 API 類似於 MQTTRedis Pub/Sub

ts
const server = Bun.serve({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`升級!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success ? undefined : new Response("WebSocket 升級錯誤", { status: 400 });
    }

    return new Response("Hello world");
  },
  websocket: {
    // TypeScript:像這樣指定 ws.data 的類型
    data: {} as { username: string },
    open(ws) {
      const msg = `${ws.data.username} 已進入聊天`;
      ws.subscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
    message(ws, message) {
      // 這是群聊
      // 所以服務器重新廣播傳入的消息給每個人
      server.publish("the-group-chat", `${ws.data.username}: ${message}`);

      // 檢查當前訂閱
      console.log(ws.subscriptions); // ["the-group-chat"]
    },
    close(ws) {
      const msg = `${ws.data.username} 已離開聊天`;
      ws.unsubscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
  },
});

console.log(`監聽 ${server.hostname}:${server.port}`);

調用 .publish(data) 會將消息發送給主題的所有訂閱者,除了 調用 .publish() 的套接字。要向主題的所有訂閱者發送消息,使用 Server 實例上的 .publish() 方法。

ts
const server = Bun.serve({
  websocket: {
    // ...
  },
});

// 監聽一些外部事件
server.publish("the-group-chat", "Hello world");

壓縮

每消息 壓縮 可以通過 perMessageDeflate 參數啟用。

ts
Bun.serve({
  websocket: {
    perMessageDeflate: true, 
  },
});

可以通過將 boolean 作為第二個參數傳遞給 .send() 來為單個消息啟用壓縮。

ts
ws.send("Hello world", true);

有關壓縮特性的細粒度控制,請參閱 參考

背壓

ServerWebSocket.send(message) 方法返回一個 number 表示操作結果。

  • -1 — 消息已入隊但存在背壓
  • 0 — 由於連接問題消息被丟棄
  • 1+ — 發送的字節數

這讓你更好地控制服務器中的背壓。

超時和限制

默認情況下,如果 WebSocket 連接空閒 120 秒,Bun 將關閉它。這可以通過 idleTimeout 參數配置。

ts
Bun.serve({
  fetch(req, server) {}, // 升級邏輯
  websocket: {
    idleTimeout: 60, // 60 秒
  },
});

如果 Bun 收到大於 16 MB 的消息,它也會關閉 WebSocket 連接。這可以通過 maxPayloadLength 參數配置。

ts
Bun.serve({
  fetch(req, server) {}, // 升級邏輯
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 MB
  },
});

連接到 WebSocket 服務器

Bun 實現了 WebSocket 類。要創建連接到 ws://wss:// 服務器的 WebSocket 客戶端,創建 WebSocket 實例,就像在瀏覽器中一樣。

ts
const socket = new WebSocket("ws://localhost:3000");

// 使用子協議協商
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);

在瀏覽器中,當前設置在頁面上的 cookies 將隨 WebSocket 升級請求發送。這是 WebSocket API 的標准功能。

為了方便,Bun 允許你直接在構造函數中設置自定義頭。這是 WebSocket 標准的 Bun 特定擴展。這在瀏覽器中不起作用。

ts
const socket = new WebSocket("ws://localhost:3000", {
  headers: {
    /* 自定義頭 */
  }, 
});

要向套接字添加事件監聽器:

ts
// 收到消息
socket.addEventListener("message", event => {});

// 套接字打開
socket.addEventListener("open", event => {});

// 套接字關閉
socket.addEventListener("close", event => {});

// 錯誤處理程序
socket.addEventListener("error", event => {});

參考

ts
namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // 默認:16 * 1024 * 1024 = 16 MB
      idleTimeout?: number; // 默認:120(秒)
      backpressureLimit?: number; // 默認:1024 * 1024 = 1 MB
      closeOnBackpressureLimit?: boolean; // 默認:false
      sendPings?: boolean; // 默認:true
      publishToSelf?: boolean; // 默認:false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  readonly subscriptions: string[];
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}

Bun學習網由www.bunjs.com.cn整理維護