客户端创建websocket
const [messages, setMessages] = useState([]); const [inputMessage, setInputMessage] = useState(""); const [isConnected, setIsConnected] = useState(false); const ws = useRef(null); useEffect(() => { const connectWebSocket = () => { if (ws.current && ws.current.readyState === WebSocket.OPEN) { console.log("WebSocket already connected."); return; } const newWs = new WebSocket(socketUrl);//new 一个websockt ws.current = newWs;//赋值给ws newWs.onopen = () => {//监听开着呢 setIsConnected(true); setMessages((prevMessages) => [ ...prevMessages, { type: "status", text: "Connected to server." }, ]); console.log("WebSocket connection opened."); newWs.send("Hello from React client (Vite)!"); }; newWs.onmessage = (event) => {//监听消息 const receivedText = event.data; console.log("Received message from server:", receivedText); let messageType = "server"; if (receivedText.startsWith("Server received your message:")) { messageType = "server-ack"; } else if (receivedText.startsWith("Broadcast from")) { messageType = "broadcast"; } setMessages((prevMessages) => [ ...prevMessages, { type: messageType, text: receivedText }, ]); }; newWs.onclose = () => {//监听关闭 setIsConnected(false); setMessages((prevMessages) => [ ...prevMessages, { type: "status", text: "Disconnected from server." }, ]); console.log("WebSocket connection closed."); }; newWs.onerror = (error) => {//监听错误 setIsConnected(false); setMessages((prevMessages) => [ ...prevMessages, { type: "error", text: `WebSocket error: ${error.message || "Unknown error"}`, }, ]); console.error("WebSocket error:", error); }; }; connectWebSocket(); return () => { if (ws.current) {//销毁时关闭连接 console.log("Cleaning up WebSocket connection..."); ws.current.close(); } }; }, []);
服务端
// server.js const WebSocket = require("ws");//使用ws // 创建一个 WebSocket 服务器实例,监听 8080 端口 const wss = new WebSocket.Server({ port: 8080 });new一个socket服务 // 存储所有连接的客户端 const connectedClients = new Set(); console.log("WebSocket server started on ws://localhost:8080"); // 当有新的 WebSocket 连接建立时触发 wss.on("connection", function connection(ws, req) { connectedClients.add(ws); // 将新连接添加到集合中 console.log( `Client connected: ${req.socket.remoteAddress}. Total clients: ${connectedClients.size}` ); // 定义一个定时器,用于向当前连接的客户端定时发送消息 let timer; if (!ws.isAlive) { // 避免重复启动定时器 ws.isAlive = true; // 添加一个自定义属性来标记连接是否活跃 timer = setInterval(() => { if (ws.readyState === WebSocket.OPEN) { const message = `Server time: ${new Date().toLocaleTimeString()} (from ${ req.socket.remoteAddress })`; ws.send(message); console.log(`Sent to ${req.socket.remoteAddress}: ${message}`); } else { // 如果连接状态不是 OPEN,清除定时器 clearInterval(timer); console.log( `Connection to ${req.socket.remoteAddress} not open, clearing timer.` ); } }, 3000); // 每 3 秒发送一次 ws.on("pong", () => {//心跳检测 ws.isAlive = true; // 收到 pong 帧表示连接活跃 console.log(`Received pong from ${req.socket.remoteAddress}`); }); } // 监听客户端发送的消息 ws.on("message", function incoming(message) { const decodedMessage = message.toString(); // 将 Buffer 转换为字符串 console.log( `Received from client ${req.socket.remoteAddress}: ${decodedMessage}` ); // 将收到的消息广播给所有其他连接的客户端 connectedClients.forEach((client) => { if (client !== ws && client.readyState === WebSocket.OPEN) { client.send( `Broadcast from ${req.socket.remoteAddress}: ${decodedMessage}` ); } else if (client === ws) { // 也给自己发送一个确认消息 client.send(`Server received your message: ${decodedMessage}`); } }); }); // 监听连接关闭事件 ws.on("close", function close() { connectedClients.delete(ws); // 从集合中移除断开的连接 clearInterval(timer); // 清除与该客户端相关的定时器 console.log( `Client disconnected: ${req.socket.remoteAddress}. Total clients: ${connectedClients.size}` ); }); // 监听错误事件 ws.on("error", function error(err) { console.error(`WebSocket error for ${req.socket.remoteAddress}:`, err); // 通常,错误也会导致 close 事件,但这里可以捕获特定的错误 }); }); // 定期检查所有客户端的活跃状态,并发送 ping setInterval(() => { wss.clients.forEach((ws) => { if (!ws.isAlive) { console.log( `Client ${ws._socket.remoteAddress} did not respond to ping, terminating.` ); return ws.terminate(); // 如果客户端没有响应 ping,则终止连接 } ws.isAlive = false; // 重置 isAlive 状态,等待 pong 响应 ws.ping(); console.log(`Sent ping to ${ws._socket.remoteAddress}`); }); }, 30000); // 每 30 秒发送一次 ping
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 请求-响应模式不同,WebSocket 允许服务器主动向客户端推送数据,从而实现实时的双向通信。
握手 (Handshake) :
客户端通过发送一个特殊的 HTTP 请求(带有 Upgrade: websocket
和 Connection: Upgrade
等头部)到服务器,请求升级协议。服务器如果支持 WebSocket,则会返回一个特殊的 HTTP 响应(状态码 101 Switching Protocols
),表示协议升级成功。这个过程称为 WebSocket 握手,一旦握手成功,底层的 TCP 连接将从 HTTP 协议切换到 WebSocket 协议。
数据帧 (Data Framing) :
握手成功后,客户端和服务器之间的通信不再使用 HTTP 格式,而是使用 WebSocket 协议定义的数据帧格式。数据帧包含 Opcode(操作码,指示数据类型,如文本、二进制、关闭连接等)、Payload Length(负载长度)、Masking Key(掩码键,客户端发送给服务器的数据需要用掩码加密,服务器发送给客户端的数据不需要)和 Payload Data(实际数据)。这种帧结构使得 WebSocket 可以在一个持久连接上高效地发送和接收各种类型的数据。
全双工通信 (Full-Duplex Communication) :
一旦连接建立,客户端和服务器可以独立地发送和接收数据,互不影响。客户端可以随时向服务器发送消息,服务器也可以随时向客户端发送消息,无需等待对方的响应。
持久连接 (Persistent Connection) :
与 HTTP 的短连接(每次请求后关闭连接)不同,WebSocket 连接在握手成功后会一直保持开放,直到客户端或服务器主动关闭连接或发生网络中断。这消除了 HTTP 频繁建立和关闭连接的开销,降低了延迟,提高了效率,特别适合实时应用,如聊天室、在线游戏、股票行情推送等。
心跳机制 (Ping/Pong Frames) :
为了保持连接活跃并检测连接是否中断,WebSocket 协议支持 Ping/Pong 帧。一方可以发送 Ping 帧,另一方收到后必须回复 Pong 帧,以此来检测连接是否仍然存活。
客户端
// src/App.jsx (或 src/App.tsx) import React, { useState, useEffect, useRef } from "react"; import "./App.css"; // Vite 默认会创建一个 App.css function SSEClient() { const [messages, setMessages] = useState([]); // 存储接收到的事件消息 const [isConnected, setIsConnected] = useState(false); // 连接状态 const eventSourceRef = useRef(null); // 使用 useRef 来保存 EventSource 实例 const sseUrl = "http://localhost:8081/events"; // 你的 SSE 服务器地址 useEffect(() => { const connectSSE = () => { if (eventSourceRef.current) { console.log( "EventSource instance already exists. Closing old one before reconnecting." ); eventSourceRef.current.close(); // 确保关闭旧连接 } const newEventSource = new EventSource(sseUrl); eventSourceRef.current = newEventSource; // 保存 EventSource 实例到 ref newEventSource.onopen = (event) => { setIsConnected(true); setMessages((prevMessages) => [ ...prevMessages, { type: "status", text: "Connected to SSE server." }, ]); console.log("SSE connection opened:", event); }; // 监听默认事件 ('message') newEventSource.onmessage = (event) => { const receivedText = event.data; console.log("Received default message from SSE:", receivedText); setMessages((prevMessages) => [ ...prevMessages, { type: "default-event", text: receivedText }, ]); }; // 监听自定义事件 ('customTime') newEventSource.addEventListener("customTime", (event) => { const receivedText = event.data; const eventId = event.lastEventId; // 获取事件ID console.log( `Received customTime event (ID: ${eventId}) from SSE:`, receivedText ); setMessages((prevMessages) => [ ...prevMessages, { type: "custom-event", text: `[ID:${eventId}] ${receivedText}` }, ]); }); newEventSource.onerror = (error) => { setIsConnected(false); setMessages((prevMessages) => [ ...prevMessages, { type: "error", text: `SSE error: ${error.message || "Unknown error"}`, }, ]); console.error("SSE error:", error); // EventSource 会自动重连,但你也可以在这里添加自定义重连逻辑或错误处理 }; // EventSource 没有 onclose 事件,当连接断开时,它会自动尝试重连 // onerror 会在重连失败时触发,onopen 会在重连成功时触发 }; connectSSE(); // 组件挂载时立即连接 // --- 清理函数 --- // 组件卸载时关闭 EventSource 连接 return () => { if (eventSourceRef.current) { console.log("Cleaning up SSE connection..."); eventSourceRef.current.close(); } }; }, []); // 空数组表示只在组件挂载和卸载时运行 // SSE 是单向的,客户端不能主动发送消息给服务器。 // 所以移除了输入框和发送按钮。 const handleReconnect = () => { // 强制重连 setMessages((prevMessages) => [ ...prevMessages, { type: "status", text: "Attempting to reconnect SSE..." }, ]); connectSSE(); }; console.log("SSEClient rendered", messages); return ({" "} {/* 沿用之前的CSS类名,或者你也可以改为 sse-container */}React SSE ClientStatus: {isConnected ? "Connected" : "Disconnected"}{messages.map((msg, index) => ({msg.type === "status" ? "Status: " : msg.type === "default-event" ? "Default Event: " : msg.type === "custom-event" ? "Custom Event: " : "Error: "} {msg.text}))}{/* SSE 客户端通常没有消息输入框和发送按钮 */}Reconnect SSE); } // Next.js App Router 导出方式 export default SSEClient;
服务端
// server-sse.js const http = require("http"); const PORT = 8081; // SSE 服务器使用不同端口以避免冲突 // 存储所有连接的客户端响应对象,以便向它们推送事件 const clients = []; // 定期推送时间事件的函数 function sendTimeUpdates() { let count = 0; setInterval(() => { const currentTime = new Date().toLocaleTimeString(); const eventData = `data: ${currentTime}\n\n`; // 默认事件类型 const customEventData = `event: customTime\ndata: ${currentTime} - Custom Event ${count++}\nid: ${Date.now()}\n\n`; // 自定义事件类型 clients.forEach((res) => { if (res.writableEnded) { // 检查连接是否已关闭 // 如果连接已关闭,从客户端列表中移除 const index = clients.indexOf(res); if (index > -1) { clients.splice(index, 1); console.log( "Removed closed SSE client. Total clients:", clients.length ); } return; } // 发送默认事件 res.write(eventData); // 发送自定义事件 res.write(customEventData); console.log( `Pushed data to client. Default: ${currentTime}, Custom: ${currentTime} - Custom Event ${ count - 1 }` ); }); if (clients.length === 0) { console.log("No SSE clients connected."); } }, 3000); // 每 3 秒推送一次 } const server = http.createServer((req, res) => { if (req.url === "/events") { // 设置 SSE 必需的响应头 res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "Access-Control-Allow-Origin": "*", // 允许跨域请求 "Access-Control-Allow-Headers": "Last-Event-ID", // 允许客户端发送 Last-Event-ID }); // 可选:发送一个初始注释,一些代理会缓存第一个字节 res.write(": Connected\n\n"); // 注释行,以冒号开头 // 发送重连间隔(可选,但推荐) res.write("retry: 5000\n\n"); // 5秒后重连 // 将响应对象添加到客户端列表中 clients.push(res); console.log(`New SSE client connected. Total clients: ${clients.length}`); // 当客户端关闭连接时,将其从列表中移除 req.on("close", () => { const index = clients.indexOf(res); if (index > -1) { clients.splice(index, 1); } console.log(`SSE client disconnected. Total clients: ${clients.length}`); }); } else { res.writeHead(404, { "Content-Type": "text/plain" }); res.end("Not Found"); } }); server.listen(PORT, () => { console.log(`SSE Server listening on http://localhost:${PORT}`); sendTimeUpdates(); // 启动时间推送 });
原理解释:
SSE 是一种单向通信技术,允许服务器持续地向客户端推送数据。它基于 HTTP 协议,通过一个持久的 HTTP 连接发送事件流。SSE 通常用于服务器主动更新客户端数据的场景,例如实时股票报价、新闻推送、进度更新等,而不需要客户端向服务器发送数据(如果需要双向通信,WebSocket 仍然是更好的选择)。
基于 HTTP: SSE 建立在 HTTP 协议之上,通常使用 GET 请求。
长连接: 客户端向服务器发起一个普通的 HTTP GET 请求,但服务器不会立即关闭连接,而是保持连接打开。
Content-Type: 服务器响应的 Content-Type
必须是 text/event-stream
。
事件流格式: 服务器发送的数据必须遵循特定的事件流格式:
data:
: 包含要发送的数据。可以有多行 data:
字段,它们将合并成一个数据字符串。event:
: 可选,指定事件类型。客户端可以通过 addEventListener()
监听特定事件。id:
: 可选,事件的唯一 ID。客户端在断开重连时可以使用 Last-Event-ID
头告诉服务器从哪个事件开始重发。retry:
: 可选,客户端在连接断开后尝试重连的间隔时间(毫秒)。
自动重连: 浏览器内置的 EventSource
API 会自动处理连接断开后的重连。
简单: 相对于 WebSocket,SSE 的实现和使用更简单,因为它不需要复杂的握手过程和自定义协议帧。
特性SSE (Server-Sent Events)WebSocket通信方向单向 (服务器到客户端)双向 (全双工)协议基于 HTTP,通过长连接推送事件独立于 HTTP 的新协议 (WS)连接简单 HTTP GET 请求,text/event-stream
内容类型复杂的 HTTP 握手升级,然后切换到 WS 协议数据格式文本流 (data:
, event:
, id:
, retry:
)任意数据 (文本、二进制),通过帧封装重连浏览器内置 EventSource
自动处理需要客户端手动实现重连逻辑场景实时数据推送 (新闻、股票、进度更新)实时双向通信 (聊天、在线游戏、协同编辑)复杂性较简单相对复杂 (服务器和客户端都需要处理协议帧)
有话要说...