前言
FastApi就不用说了,懂得都懂,不懂的请先懂了再来;
实现步骤
WebSocket使得客户端和服务器之间的数据交换变得更加简单,简单点讲就是后端可以实时推送消息给正在使用的用户,多的不说,直接进入正题。
安装websocket
创建工具类
websocketutils.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from fastapi import WebSocket from typing import List
""" 创建工具管理类 处理服务端和客户端的交互 """ class WebsocketManager: def __init__(self): self.active_clients: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_clients.append(websocket) def disconnect(self, websocket: WebSocket): self.active_clients.remove(websocket) async def send_message_to_client(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_clients: await connection.send_text(message)
|
创建服务端连接
这里是直接把代码写到router里的,可根据需要自行修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| from fastapi import APIRouter,WebSocket,WebSocketDisconnect from websocketutils import WebsocketManager
router = APIRouter() manager = WebsocketManager()
@router.websocket("/ws/{client_id}") async def websocket_serve(client_id: str, websocket: WebSocket): await manager.connect(websocket) await manager.broadcast(f"{client_id} 进入了聊天室") try: while True: data = await websocket.receive_text() await manager.broadcast(f"{client_id} 发送消息:{data}") await manager.send_message_to_client(f"服务端回复{client_id}:你发送的信息是:{data}", websocket) except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"{client_id} 离开了聊天室")
|
前端页面
使用vue编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| <template> <div id="app"> <div v-if="!websocket"> 用户:<input type="text" v-model="user"/> <button @click="login">登录</button> </div> <div v-else> {{user}}:<input type="text" v-model="message" autocomplete="off"/><button @click="sendMessage">Send</button> <ul> <li v-for="msg in messages" :key="msg">{{msg}}</li> </ul> </div> </div> </template>
<script>
export default { name: 'App', data(){ return { // 所有消息 messages: [], // 单个消息 message: "", // websocket对象 websocket: null, // 用户 user: null, // 连接标识 lockReconnect: false, // 超时记录 timeout: 5000, timeoutObj: null, } }, methods:{ /** * 登录 */ login(){ if(this.user){ this.addChat(this.user); } }, /** * 添加聊天室 */ addChat(user){ // 创建一个 WebSocket 连接 this.websocket = new WebSocket(`ws://localhost:8000/ws/${user}`); // 赋值this 给that 后面内部方法读取不到正确的this let that = this;
// 添加事件处理程序 this.websocket.onopen = () => { console.log('WebSocket 已打开'); // 每隔一段时间发送心跳消息 保持与服务器之间的连接 setInterval(() => { if (this.websocket && this.websocket.readyState === WebSocket.OPEN) { that.sendHeartbeatMessage(); } else { clearInterval(); } }, that.timeout); // 这里设置为每隔5秒发送一次心跳消息 };
// 收到消息 this.websocket.onmessage = (event) =>{ if(event.data.indexOf("heartbeat")<0){ that.messages.push(event.data); } //收到服务器信息,心跳重置 that.reset(); }; this.websocket.onerror = () =>{ console.log("连接失败"); //重连 that.reconnect(); }; // 后端如果重启或者其他原因导致之前的连接已经断开 需要重新连接 this.websocket.onclose = () =>{ console.log("连接关闭"); //重连 that.reconnect(); } }, /** * 向服务器发送心跳消息 */ sendHeartbeatMessage() { this.websocket.send('heartbeat'); }, /** * 给服务端发送消息 */ sendMessage(){ if(this.message){ this.websocket.send(this.message); this.message=""; } }, /** * 重置 */ reset() { console.log("连接成功,重置"); var that = this; //清除时间 if (that.timeoutObj) clearInterval(that.timeoutObj); }, /** * 重新连接 */ reconnect(){ var that = this; clearInterval(that.timeoutObj); that.timeoutObj = setInterval(() => { console.log("尝试重新连接"); // that.websocket.close(); that.addChat(that.user); if (this.websocket && this.websocket.readyState === WebSocket.OPEN) { that.sendHeartbeatMessage(); } }, that.timeout); // 这里设置为每隔5秒发送一次心跳消息 }, } } </script>
|
进入页面,输入用户名即可进入聊天室,后端会广播所有用户 某某某进入聊天室;心跳机制是为了保持客户端与服务端的连接,如果长时间不发送消息会导致连接断开,所以增加心跳机制,这部分可根据个人需要自行完善。