WebSocket
WebSocket 是一种用于客户端与服务器之间通信的实时协议。
与 HTTP 不同,客户端一次又一次地询问网站信息并等待每次的回复,WebSocket 建立了一条直接的通道,使我们的客户端和服务器可以直接来回发送消息,从而使对话更快、更流畅,而无需每条消息都重新开始。
Vafast 使用 Bun 的内置 WebSocket 支持,提供了高性能的实时通信能力。
基本 WebSocket 实现
要使用 WebSocket,您可以使用 Bun 的内置 WebSocket 服务器:
typescript
import { Server, defineRoutes, createHandler } from 'vafast'
// 定义 HTTP 路由
const routes = defineRoutes([
{
method: 'GET',
path: '/',
handler: createHandler(() => 'Hello Vafast!')
}
])
const server = new Server(routes)
// 创建 WebSocket 服务器
const wsServer = Bun.serve({
port: 3001,
fetch(req, server) {
// 升级 HTTP 请求到 WebSocket
if (server.upgrade(req)) {
return // 返回 undefined 表示升级成功
}
return new Response("Upgrade failed", { status: 500 })
},
websocket: {
// 连接建立时
open(ws) {
console.log("WebSocket 连接已建立")
ws.send(JSON.stringify({ type: 'connected', message: 'Welcome to Vafast WebSocket!' }))
},
// 接收消息时
message(ws, message) {
console.log("收到消息:", message)
// 回显消息
ws.send(JSON.stringify({
type: 'echo',
message: message,
timestamp: Date.now()
}))
},
// 连接关闭时
close(ws, code, reason) {
console.log("WebSocket 连接已关闭:", code, reason)
},
// 发生错误时
error(ws, error) {
console.error("WebSocket 错误:", error)
}
}
})
console.log(`WebSocket 服务器运行在端口 ${wsServer.port}`)
export default { fetch: server.fetch }集成到 Vafast 应用
您可以将 WebSocket 功能集成到现有的 Vafast 应用中:
typescript
import { Server, defineRoutes, createHandler } from 'vafast'
const routes = defineRoutes([
{
method: 'GET',
path: '/',
handler: createHandler(() => `
<!DOCTYPE html>
<html>
<head>
<title>Vafast WebSocket Demo</title>
</head>
<body>
<h1>Vafast WebSocket 演示</h1>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息">
<button onclick="sendMessage()">发送</button>
<script>
const ws = new WebSocket('ws://localhost:3001')
const messagesDiv = document.getElementById('messages')
ws.onopen = () => {
console.log('WebSocket 连接已建立')
}
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
const messageDiv = document.createElement('div')
messageDiv.textContent = \`[\${new Date().toLocaleTimeString()}] \${data.message}\`
messagesDiv.appendChild(messageDiv)
}
function sendMessage() {
const input = document.getElementById('messageInput')
const message = input.value
if (message) {
ws.send(message)
input.value = ''
}
}
</script>
</body>
</html>
`)
}
])
const server = new Server(routes)
// 启动 HTTP 服务器
const httpServer = Bun.serve({
port: 3000,
fetch: server.fetch
})
// 启动 WebSocket 服务器
const wsServer = Bun.serve({
port: 3001,
fetch(req, server) {
if (server.upgrade(req)) {
return
}
return new Response("Upgrade failed", { status: 500 })
},
websocket: {
open(ws) {
console.log("WebSocket 连接已建立")
},
message(ws, message) {
console.log("收到消息:", message)
ws.send(JSON.stringify({
type: 'response',
message: \`服务器收到: \${message}\`,
timestamp: Date.now()
}))
},
close(ws) {
console.log("WebSocket 连接已关闭")
}
}
})
console.log(`HTTP 服务器运行在端口 ${httpServer.port}`)
console.log(`WebSocket 服务器运行在端口 ${wsServer.port}`)类型安全的 WebSocket
您可以使用 TypeScript 和 TypeBox 来确保 WebSocket 消息的类型安全:
typescript
import { Type } from '@sinclair/typebox'
// 定义消息类型
const MessageSchema = Type.Object({
type: Type.Union([
Type.Literal('chat'),
Type.Literal('notification'),
Type.Literal('status')
]),
content: Type.String(),
userId: Type.Optional(Type.String()),
timestamp: Type.Optional(Type.Number())
})
type Message = Static<typeof MessageSchema>
// WebSocket 服务器
const wsServer = Bun.serve({
port: 3001,
fetch(req, server) {
if (server.upgrade(req)) {
return
}
return new Response("Upgrade failed", { status: 500 })
},
websocket: {
open(ws) {
console.log("WebSocket 连接已建立")
},
message(ws, message) {
try {
// 解析和验证消息
const parsedMessage = JSON.parse(message as string)
if (Type.Check(MessageSchema, parsedMessage)) {
// 消息类型安全
const validatedMessage: Message = parsedMessage
console.log("收到有效消息:", validatedMessage)
// 处理不同类型的消息
switch (validatedMessage.type) {
case 'chat':
ws.send(JSON.stringify({
type: 'chat_response',
content: `收到聊天消息: ${validatedMessage.content}`,
timestamp: Date.now()
}))
break
case 'notification':
ws.send(JSON.stringify({
type: 'notification_response',
content: '通知已收到',
timestamp: Date.now()
}))
break
case 'status':
ws.send(JSON.stringify({
type: 'status_response',
content: '状态已更新',
timestamp: Date.now()
}))
break
}
} else {
// 发送验证错误
ws.send(JSON.stringify({
type: 'error',
message: '消息格式无效',
timestamp: Date.now()
}))
}
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
message: '消息解析失败',
timestamp: Date.now()
}))
}
},
close(ws) {
console.log("WebSocket 连接已关闭")
}
}
})房间和广播
实现聊天室功能:
typescript
import { Server, defineRoutes, createHandler } from 'vafast'
// 房间管理
class RoomManager {
private rooms = new Map<string, Set<WebSocket>>()
addToRoom(roomId: string, ws: WebSocket) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set())
}
this.rooms.get(roomId)!.add(ws)
}
removeFromRoom(roomId: string, ws: WebSocket) {
const room = this.rooms.get(roomId)
if (room) {
room.delete(ws)
if (room.size === 0) {
this.rooms.delete(roomId)
}
}
}
broadcastToRoom(roomId: string, message: any, excludeWs?: WebSocket) {
const room = this.rooms.get(roomId)
if (room) {
const messageStr = JSON.stringify(message)
room.forEach(ws => {
if (ws !== excludeWs && ws.readyState === WebSocket.OPEN) {
ws.send(messageStr)
}
})
}
}
}
const roomManager = new RoomManager()
const wsServer = Bun.serve({
port: 3001,
fetch(req, server) {
if (server.upgrade(req)) {
return
}
return new Response("Upgrade failed", { status: 500 })
},
websocket: {
open(ws) {
console.log("WebSocket 连接已建立")
},
message(ws, message) {
try {
const data = JSON.parse(message as string)
if (data.type === 'join_room') {
const { roomId, userId } = data
roomManager.addToRoom(roomId, ws)
// 通知房间其他用户
roomManager.broadcastToRoom(roomId, {
type: 'user_joined',
userId,
roomId,
timestamp: Date.now()
}, ws)
// 确认加入房间
ws.send(JSON.stringify({
type: 'room_joined',
roomId,
userId,
timestamp: Date.now()
}))
} else if (data.type === 'chat_message') {
const { roomId, userId, content } = data
// 广播消息到房间
roomManager.broadcastToRoom(roomId, {
type: 'chat_message',
userId,
content,
roomId,
timestamp: Date.now()
})
} else if (data.type === 'leave_room') {
const { roomId, userId } = data
roomManager.removeFromRoom(roomId, ws)
// 通知房间其他用户
roomManager.broadcastToRoom(roomId, {
type: 'user_left',
userId,
roomId,
timestamp: Date.now()
})
}
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
message: '消息处理失败',
timestamp: Date.now()
}))
}
},
close(ws) {
console.log("WebSocket 连接已关闭")
// 从所有房间中移除用户
// 这里需要维护 ws 到房间的映射
}
}
})配置选项
Bun 的 WebSocket 服务器支持多种配置选项:
typescript
const wsServer = Bun.serve({
port: 3001,
fetch(req, server) {
if (server.upgrade(req)) {
return
}
return new Response("Upgrade failed", { status: 500 })
},
websocket: {
// 连接建立时
open(ws) {
console.log("WebSocket 连接已建立")
},
// 接收消息时
message(ws, message) {
ws.send(message)
},
// 连接关闭时
close(ws, code, reason) {
console.log("WebSocket 连接已关闭:", code, reason)
},
// 发生错误时
error(ws, error) {
console.error("WebSocket 错误:", error)
}
},
// 其他配置选项
maxPayloadLength: 1024 * 1024, // 1MB
idleTimeout: 30, // 30秒空闲超时
perMessageDeflate: false, // 禁用消息压缩
maxBackpressure: 1024 * 1024 // 1MB 背压限制
})错误处理
实现健壮的错误处理:
typescript
const wsServer = Bun.serve({
port: 3001,
fetch(req, server) {
if (server.upgrade(req)) {
return
}
return new Response("Upgrade failed", { status: 500 })
},
websocket: {
open(ws) {
try {
console.log("WebSocket 连接已建立")
ws.send(JSON.stringify({ type: 'connected' }))
} catch (error) {
console.error("连接建立失败:", error)
ws.close(1011, "Internal error")
}
},
message(ws, message) {
try {
const data = JSON.parse(message as string)
// 处理消息...
ws.send(JSON.stringify({ type: 'ack', data }))
} catch (error) {
console.error("消息处理失败:", error)
ws.send(JSON.stringify({
type: 'error',
message: '消息处理失败',
error: error instanceof Error ? error.message : 'Unknown error'
}))
}
},
close(ws, code, reason) {
console.log("WebSocket 连接已关闭:", code, reason)
// 清理资源
},
error(ws, error) {
console.error("WebSocket 错误:", error)
// 尝试优雅地关闭连接
try {
ws.close(1011, "Internal error")
} catch (closeError) {
console.error("关闭连接失败:", closeError)
}
}
}
})性能优化
1. 消息批处理
typescript
class MessageBatcher {
private batch: any[] = []
private batchTimeout: Timer | null = null
private readonly batchSize = 10
private readonly batchDelay = 100 // 100ms
addMessage(message: any) {
this.batch.push(message)
if (this.batch.length >= this.batchSize) {
this.flush()
} else if (!this.batchTimeout) {
this.batchTimeout = setTimeout(() => this.flush(), this.batchDelay)
}
}
private flush() {
if (this.batchTimeout) {
clearTimeout(this.batchTimeout)
this.batchTimeout = null
}
if (this.batch.length > 0) {
// 发送批量消息
console.log(`发送批量消息: ${this.batch.length} 条`)
this.batch = []
}
}
}
const batcher = new MessageBatcher()2. 连接池管理
typescript
class ConnectionPool {
private connections = new Set<WebSocket>()
private maxConnections = 1000
addConnection(ws: WebSocket): boolean {
if (this.connections.size >= this.maxConnections) {
return false
}
this.connections.add(ws)
return true
}
removeConnection(ws: WebSocket) {
this.connections.delete(ws)
}
broadcast(message: any) {
const messageStr = JSON.stringify(message)
this.connections.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(messageStr)
}
})
}
getConnectionCount(): number {
return this.connections.size
}
}
const connectionPool = new ConnectionPool()总结
Vafast 的 WebSocket 实现提供了:
- ✅ 基于 Bun 的高性能 WebSocket 支持
- ✅ 类型安全的消息处理
- ✅ 房间和广播功能
- ✅ 完整的错误处理
- ✅ 性能优化选项
- ✅ 灵活的配置选项
