Skip to content

Bun.serve() поддерживает серверный WebSocket с компрессией на лету поддержкой TLS и собственным API публикации-подписки Bun.

Примечание

⚡️ В 7 раз больше пропускной способности

WebSocket в Bun быстрые. Для простого чата на Linux x64 Bun может обрабатывать в 7 раз больше запросов в секунду чем Node.js + "ws".

Отправлено сообщений в секундуRuntimeКлиенты
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16

Внутри Bun реализация WebSocket построена на uWebSockets.


Запуск WebSocket-сервера

Ниже представлен простой WebSocket-сервер созданный с помощью Bun.serve в котором все входящие запросы повышаются до WebSocket-соединений в обработчике fetch. Обработчики сокетов объявляются в параметре websocket.

ts
Bun.serve({
  fetch(req, server) {
    // повышение запроса до WebSocket
    if (server.upgrade(req)) {
      return; // не возвращать Response
    }
    return new Response("Повышение не удалось", { status: 500 });
  },
  websocket: {}, // обработчики
});

Поддерживаются следующие обработчики событий WebSocket:

ts
Bun.serve({
  fetch(req, server) {}, // логика повышения
  websocket: {
    message(ws, message) {}, // получено сообщение
    open(ws) {}, // сокет открыт
    close(ws, code, message) {}, // сокет закрыт
    drain(ws) {}, // сокет готов получать больше данных
  },
});

API разработанный для скорости

В Bun обработчики объявляются один раз для сервера а не для каждого сокета.

ServerWebSocket ожидает что вы передадите объект WebSocketHandler в метод Bun.serve() который имеет методы для open, message, close, drain и error. Это отличается от клиентского класса WebSocket который расширяет EventTarget (onmessage, onopen, onclose).

Клиенты обычно не имеют много открытых сокет-соединений поэтому событийный API имеет смысл.

Но серверы имеют много открытых сокет-соединений что означает:

  • Время затрачиваемое на добавление/удаление слушателей событий для каждого соединения накапливается
  • Дополнительная память затрачиваемая на хранение ссылок на функции обратного вызова для каждого соединения
  • Обычно люди создают новые функции для каждого соединения что также означает больше памяти

Поэтому вместо использования событийного API ServerWebSocket ожидает что вы передадите единый объект с методами для каждого события в Bun.serve() и он используется повторно для каждого соединения.

Это приводит к меньшему использованию памяти и меньшему времени затрачиваемому на добавление/удаление слушателей событий.

Первым аргументом для каждого обработчика является экземпляр ServerWebSocket обрабатывающий событие. Класс ServerWebSocket — это быстрая собственная реализация Bun WebSocket с некоторыми дополнительными функциями.

ts
Bun.serve({
  fetch(req, server) {}, // логика повышения
  websocket: {
    message(ws, message) {
      ws.send(message); // эхо-ответ сообщения
    },
  },
});

Отправка сообщений

Каждый экземпляр ServerWebSocket имеет метод .send() для отправки сообщений клиенту. Он поддерживает диапазон типов ввода.

ts
Bun.serve({
  fetch(req, server) {}, // логика повышения
  websocket: {
    message(ws, message) {
      ws.send("Привет мир"); // string
      ws.send(response.arrayBuffer()); // ArrayBuffer
      ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
    },
  },
});

Заголовки

После успешного повышения Bun отправит ответ 101 Switching Protocols согласно спецификации. Дополнительные headers могут быть прикреплены к этому Response в вызове server.upgrade().

ts
Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: { 
        "Set-Cookie": `SessionId=${sessionId}`, 
      }, 
    });
  },
  websocket: {}, // обработчики
});

Контекстные данные

Контекстные data могут быть прикреплены к новому WebSocket в вызове .upgrade(). Эти данные доступны в свойстве ws.data внутри обработчиков WebSocket.

Для строгой типизации ws.data добавьте свойство data в объект обработчика websocket. Это типизирует ws.data во всех хуках жизненного цикла.

ts
type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

Bun.serve({
  fetch(req, server) {
    const cookies = new Bun.CookieMap(req.headers.get("cookie")!);

    server.upgrade(req, {
      // этот объект должен соответствовать WebSocketData
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies.get("X-Token"),
      },
    });

    return undefined;
  },
  websocket: {
    // TypeScript: укажите тип ws.data вот так
    data: {} as WebSocketData,
    // обработчик вызывается при получении сообщения
    async message(ws, message) {
      // ws.data теперь правильно типизирован как WebSocketData
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});

Примечание

Примечание: Ранее вы могли указать тип ws.data используя параметр типа на Bun.serve как Bun.serve<MyData>({...}). Этот шаблон был удалён из-за ограничения в TypeScript в пользу свойства data показанного выше.

Для подключения к этому серверу из браузера создайте новый WebSocket.

js
const socket = new WebSocket("ws://localhost:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
});

Примечание

Идентификация пользователей

Cookies которые в настоящее время установлены на странице будут отправлены с запросом повышения WebSocket и доступны в req.headers в обработчике fetch. Разберите эти cookies для определения идентичности подключающегося пользователя и установите значение data соответствующим образом.

Pub/Sub

Реализация ServerWebSocket в Bun реализует собственный API публикации-подписки для широковещательной рассылки на основе тем. Отдельные сокеты могут .subscribe() на тему (указанную строковым идентификатором) и .publish() сообщения всем другим подписчикам этой темы (исключая себя). Этот API широковещательной рассылки на основе тем похож на MQTT и Redis Pub/Sub.

ts
const server = Bun.serve({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`повышение!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success ? undefined : new Response("Ошибка повышения WebSocket", { status: 400 });
    }

    return new Response("Привет мир");
  },
  websocket: {
    // TypeScript: укажите тип ws.data вот так
    data: {} as { username: string },
    open(ws) {
      const msg = `${ws.data.username} вошёл в чат`;
      ws.subscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
    message(ws, message) {
      // это групповой чат
      // поэтому сервер ретранслирует входящие сообщения всем
      server.publish("the-group-chat", `${ws.data.username}: ${message}`);

      // проверка текущих подписок
      console.log(ws.subscriptions); // ["the-group-chat"]
    },
    close(ws) {
      const msg = `${ws.data.username} покинул чат`;
      ws.unsubscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
  },
});

console.log(`Прослушивание на ${server.hostname}:${server.port}`);

Вызов .publish(data) отправит сообщение всем подписчикам темы кроме сокета который вызвал .publish(). Для отправки сообщения всем подписчикам темы используйте метод .publish() на экземпляре Server.

ts
const server = Bun.serve({
  websocket: {
    // ...
  },
});

// прослушивание некоторого внешнего события
server.publish("the-group-chat", "Привет мир");

Компрессия

Компрессия для каждого сообщения может быть включена с параметром perMessageDeflate.

ts
Bun.serve({
  websocket: {
    perMessageDeflate: true, 
  },
});

Компрессия может быть включена для отдельных сообщений передав boolean вторым аргументом в .send().

ts
ws.send("Привет мир", true);

Для детального контроля над характеристиками компрессии обратитесь к Справочнику.

Обратное давление

Метод .send(message) объекта ServerWebSocket возвращает number указывающий результат операции.

  • -1 — Сообщение было поставлено в очередь но есть обратное давление
  • 0 — Сообщение было отброшено из-за проблемы соединения
  • 1+ — Количество отправленных байтов

Это даёт вам лучший контроль над обратным давлением в вашем сервере.

Таймауты и лимиты

По умолчанию Bun закроет WebSocket-соединение если оно бездействует в течение 120 секунд. Это можно настроить с параметром idleTimeout.

ts
Bun.serve({
  fetch(req, server) {}, // логика повышения
  websocket: {
    idleTimeout: 60, // 60 секунд
  },
});

Bun также закроет WebSocket-соединение если получит сообщение больше 16 МБ. Это можно настроить с параметром maxPayloadLength.

ts
Bun.serve({
  fetch(req, server) {}, // логика повышения
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 МБ
  },
});

Подключение к серверу WebSocket

Bun реализует класс WebSocket. Для создания WebSocket-клиента который подключается к серверу ws:// или wss:// создайте экземпляр WebSocket как вы бы сделали в браузере.

ts
const socket = new WebSocket("ws://localhost:3000");

// С согласованием подпротокола
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);

В браузерах cookies которые в настоящее время установлены на странице будут отправлены с запросом повышения WebSocket. Это стандартная функция API WebSocket.

Для удобства Bun позволяет устанавливать пользовательские заголовки непосредственно в конструкторе. Это специфичное для Bun расширение стандарта WebSocket. Это не будет работать в браузерах.

ts
const socket = new WebSocket("ws://localhost:3000", {
  headers: {
    /* пользовательские заголовки */
  }, 
});

Для добавления слушателей событий к сокету:

ts
// получено сообщение
socket.addEventListener("message", event => {});

// сокет открыт
socket.addEventListener("open", event => {});

// сокет закрыт
socket.addEventListener("close", event => {});

// обработчик ошибок
socket.addEventListener("error", event => {});

Справочник

ts
namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // по умолчанию: 16 * 1024 * 1024 = 16 МБ
      idleTimeout?: number; // по умолчанию: 120 (секунд)
      backpressureLimit?: number; // по умолчанию: 1024 * 1024 = 1 МБ
      closeOnBackpressureLimit?: boolean; // по умолчанию: false
      sendPings?: boolean; // по умолчанию: true
      publishToSelf?: boolean; // по умолчанию: false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  readonly subscriptions: string[];
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}

Bun от www.bunjs.com.cn