Skip to content

Bun.serve() はサーバーサイド WebSocket をサポートしており、オンザフライ圧縮、TLS サポート、Bun ネイティブの公開 - 購読 API を備えています。

注記

⚡️ 7 倍のスループット

Bun の WebSocket は高速です。Linux x64 での シンプルなチャットルーム では、Bun は Node.js + "ws" よりも 1 秒あたり 7 倍多くのリクエストを処理できます。

1 秒あたりの送信メッセージ数ランタイムクライアント数
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16

内部的に Bun の WebSocket 実装は uWebSockets をベースに構築されています。


WebSocket サーバーの起動

以下は Bun.serve で構築されたシンプルな WebSocket サーバーで、すべての受信リクエストが fetch ハンドラーで WebSocket 接続に アップグレード されます。ソケットハンドラーは websocket パラメーターで宣言されます。

ts
Bun.serve({
  fetch(req, server) {
    // リクエストを WebSocket にアップグレード
    if (server.upgrade(req)) {
      return; // Response を返さない
    }
    return new Response("Upgrade failed", { status: 500 });
  },
  websocket: {}, // ハンドラー
});

以下の WebSocket イベントハンドラーがサポートされています。

ts
Bun.serve({
  fetch(req, server) {}, // アップグレードロジック
  websocket: {
    message(ws, message) {}, // メッセージを受信
    open(ws) {}, // ソケットがオープン
    close(ws, code, message) {}, // ソケットがクローズ
    drain(ws) {}, // ソケットがより多くのデータを受信する準備完了
  },
});

速度のために設計された API

Bun では、ハンドラーはソケットごとではなくサーバーごとに 1 回宣言されます。

ServerWebSocketopenmessageclosedrainerror のメソッドを持つ WebSocketHandler オブジェクトを Bun.serve() メソッドに渡すことを期待しています。これはイベントベース(onmessage、onopen、onclose)の EventTarget を拡張するクライアントサイドの WebSocket クラスとは異なります。

クライアントは多くのソケット接続を開かない傾向があるため、イベントベースの API は理にかなっています。

しかし、サーバーは 多くの ソケット接続を開く傾向があります。つまり:

  • 各接続のイベントリスナーの追加/削除に費やされる時間が増加する
  • 各接続のコールバック関数への参照を保存するための追加メモリ
  • 通常、各接続ごとに新しい関数を作成するため、さらに多くのメモリが必要

そのため、イベントベースの API を使用する代わりに、ServerWebSocket は各接続ごとに再利用される Bun.serve() 内の各イベントのメソッドを持つ単一のオブジェクトを渡すことを期待しています。

これにより、メモリ使用量が減少し、イベントリスナーの追加/削除に費やされる時間が減少します。

各ハンドラーへの最初の引数は、イベントを処理する ServerWebSocket のインスタンスです。ServerWebSocket クラスはいくつかの追加機能を備えた WebSocket の高速な Bun ネイティブ実装です。

ts
Bun.serve({
  fetch(req, server) {}, // アップグレードロジック
  websocket: {
    message(ws, message) {
      ws.send(message); // メッセージをエコーバック
    },
  },
});

メッセージの送信

ServerWebSocket インスタンスには、クライアントにメッセージを送信するための .send() メソッドがあります。これはさまざまな入力タイプをサポートしています。

ts
Bun.serve({
  fetch(req, server) {}, // アップグレードロジック
  websocket: {
    message(ws, message) {
      ws.send("Hello world"); // 文字列
      ws.send(response.arrayBuffer()); // ArrayBuffer
      ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
    },
  },
});

ヘッダー

アップグレードが成功すると、Bun は 仕様 に従って 101 Switching Protocols レスポンスを送信します。追加の headersserver.upgrade() の呼び出しでこの Response に添付できます。

ts
Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: { 
        "Set-Cookie": `SessionId=${sessionId}`, 
      }, 
    });
  },
  websocket: {}, // ハンドラー
});

コンテキストデータ

コンテキスト data.upgrade() 呼び出しで新しい WebSocket に添付できます。このデータは WebSocket ハンドラー内の ws.data プロパティで利用可能になります。

ws.data を強く型付けするには、websocket ハンドラーオブジェクトに data プロパティを追加します。これにより、すべてのライフサイクルフックで ws.data が型付けされます。

ts
type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

Bun.serve({
  fetch(req, server) {
    const cookies = new Bun.CookieMap(req.headers.get("cookie")!);

    server.upgrade(req, {
      // このオブジェクトは WebSocketData に準拠する必要があります
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies.get("X-Token"),
      },
    });

    return undefined;
  },
  websocket: {
    // TypeScript: このように ws.data の型を指定
    data: {} as WebSocketData,
    // メッセージを受信したときに呼び出されるハンドラー
    async message(ws, message) {
      // ws.data は теперь正しく WebSocketData として型付けされます
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});

注記

注: 以前は、Bun.serve<MyData>({...}) のように Bun.serve の型パラメーターを使用して ws.data の型を指定できました。このパターンは上記の data プロパティを支持して TypeScript の制限 により削除されました。

ブラウザからこのサーバーに接続するには、新しい WebSocket を作成します。

js
const socket = new WebSocket("ws://localhost:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
});

注記

ユーザーの識別

現在ページに設定されている Cookie は WebSocket アップグレードリクエストと共に送信され、fetch ハンドラーの req.headers で利用可能になります。これらの Cookie を解析して接続ユーザーの ID を決定し、それに応じて data の値を設定します。

Pub/Sub

Bun の ServerWebSocket 実装は、トピックベースのブロードキャスト用のネイティブな公開 - 購読 API を実装しています。個々のソケットはトピック(文字列識別子で指定)を .subscribe() でき、そのトピックの他のすべての購読者(自分自身を除く)にメッセージを .publish() できます。このトピックベースのブロードキャスト API は MQTT および Redis Pub/Sub に似ています。

ts
const server = Bun.serve({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`upgrade!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success ? undefined : new Response("WebSocket upgrade error", { status: 400 });
    }

    return new Response("Hello world");
  },
  websocket: {
    // TypeScript: このように ws.data の型を指定
    data: {} as { username: string },
    open(ws) {
      const msg = `${ws.data.username} がチャットに参加しました`;
      ws.subscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
    message(ws, message) {
      // これはグループチャットなので
      // サーバーは受信メッセージを全員に再放送します
      server.publish("the-group-chat", `${ws.data.username}: ${message}`);

      // 現在の購読を検証
      console.log(ws.subscriptions); // ["the-group-chat"]
    },
    close(ws) {
      const msg = `${ws.data.username} がチャットから退出しました`;
      ws.unsubscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
  },
});

console.log(`Listening on ${server.hostname}:${server.port}`);

.publish(data) を呼び出すと、メッセージはトピックのすべての購読者に送信されますが、.publish() を呼び出したソケットは除かれます。トピックのすべての購読者にメッセージを送信するには、Server インスタンスの .publish() メソッドを使用します。

ts
const server = Bun.serve({
  websocket: {
    // ...
  },
});

// 何らかの外部イベントをリッスン
server.publish("the-group-chat", "Hello world");

圧縮

メッセージごとの 圧縮perMessageDeflate パラメーターで有効にできます。

ts
Bun.serve({
  websocket: {
    perMessageDeflate: true, 
  },
});

圧縮は .send() への 2 番目の引数として boolean を渡すことで個々のメッセージに対して有効にできます。

ts
ws.send("Hello world", true);

圧縮特性の細かい制御については、リファレンス を参照してください。

バックプレッシャー

ServerWebSocket.send(message) メソッドは、操作の結果を示す number を返します。

  • -1 — メッセージはキューイングされましたがバックプレッシャーがあります
  • 0 — 接続の問題によりメッセージがドロップされました
  • 1+ — 送信されたバイト数

これにより、サーバーのバックプレッシャーをより適切に制御できます。

タイムアウトと制限

デフォルトでは、Bun は WebSocket 接続が 120 秒アイドル状態の場合に接続を閉じます。これは idleTimeout パラメーターで設定できます。

ts
Bun.serve({
  fetch(req, server) {}, // アップグレードロジック
  websocket: {
    idleTimeout: 60, // 60 秒
  },
});

また、Bun は 16 MB よりも大きいメッセージを受信した場合に WebSocket 接続を閉じます。これは maxPayloadLength パラメーターで設定できます。

ts
Bun.serve({
  fetch(req, server) {}, // アップグレードロジック
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 MB
  },
});

WebSocket サーバーへの接続

Bun は WebSocket クラスを実装しています。ws:// または wss:// サーバーに接続する WebSocket クライアントを作成するには、ブラウザで行うように WebSocket のインスタンスを作成します。

ts
const socket = new WebSocket("ws://localhost:3000");

// サブプロトコルネゴシエーション付き
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);

ブラウザでは、現在ページに設定されている Cookie は WebSocket アップグレードリクエストと共に送信されます。これは WebSocket API の標準機能です。

便宜上、Bun ではコンストラクターで直接カスタムヘッダーを設定できます。これは WebSocket 標準の Bun 固有の拡張です。これはブラウザでは動作しません。

ts
const socket = new WebSocket("ws://localhost:3000", {
  headers: {
    /* カスタムヘッダー */
  }, 
});

ソケットにイベントリスナーを追加するには:

ts
// メッセージを受信
socket.addEventListener("message", event => {});

// ソケットがオープン
socket.addEventListener("open", event => {});

// ソケットがクローズ
socket.addEventListener("close", event => {});

// エラーハンドラー
socket.addEventListener("error", event => {});

リファレンス

ts
namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // デフォルト:16 * 1024 * 1024 = 16 MB
      idleTimeout?: number; // デフォルト:120(秒)
      backpressureLimit?: number; // デフォルト:1024 * 1024 = 1 MB
      closeOnBackpressureLimit?: boolean; // デフォルト:false
      sendPings?: boolean; // デフォルト:true
      publishToSelf?: boolean; // デフォルト:false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  readonly subscriptions: string[];
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}

Bun by www.bunjs.com.cn 編集