前言

FastApi就不用说了,懂得都懂,不懂的请先懂了再来;

实现步骤

WebSocket使得客户端和服务器之间的数据交换变得更加简单,简单点讲就是后端可以实时推送消息给正在使用的用户,多的不说,直接进入正题。

安装websocket

1
pip install websockets

创建工具类

websocketutils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from fastapi import WebSocket
from typing import List

"""
创建工具管理类 处理服务端和客户端的交互
"""
class WebsocketManager:
def __init__(self):
# 初始化参数 记录活跃的客户端
self.active_clients: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
# 创建客户与服务端之间的连接 并记录下客户端
await websocket.accept()
self.active_clients.append(websocket)

def disconnect(self, websocket: WebSocket):
# 断开某个客户端的连接
self.active_clients.remove(websocket)

async def send_message_to_client(self, message: str, websocket: WebSocket):
# 给客户端发送消息
await websocket.send_text(message)

async def broadcast(self, message: str):
# 给所有客户端发送消息 广播
for connection in self.active_clients:
await connection.send_text(message)

创建服务端连接

这里是直接把代码写到router里的,可根据需要自行修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from fastapi import APIRouter,WebSocket,WebSocketDisconnect
from websocketutils import WebsocketManager

router = APIRouter()
manager = WebsocketManager()


@router.websocket("/ws/{client_id}")
async def websocket_serve(client_id: str, websocket: WebSocket):
# 1、客户端、服务端建立 ws 连接
await manager.connect(websocket)
# 2、广播某个客户端进入聊天室
await manager.broadcast(f"{client_id} 进入了聊天室")
try:
while True:
# 3、服务端接收客户端发送的内容
data = await websocket.receive_text()
# 4、广播某个客户端发送的消息
await manager.broadcast(f"{client_id} 发送消息:{data}")
# 5、服务端回复客户端
await manager.send_message_to_client(f"服务端回复{client_id}:你发送的信息是:{data}", websocket)
except WebSocketDisconnect:
# 6、若有客户端断开连接,广播某个客户端离开了
manager.disconnect(websocket)
await manager.broadcast(f"{client_id} 离开了聊天室")

前端页面

使用vue编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<template>
<div id="app">
<div v-if="!websocket">
用户:<input type="text" v-model="user"/> <button @click="login">登录</button>
</div>
<div v-else>
{{user}}:<input type="text" v-model="message" autocomplete="off"/><button @click="sendMessage">Send</button>
<ul>
<li v-for="msg in messages" :key="msg">{{msg}}</li>
</ul>
</div>
</div>
</template>

<script>

export default {
name: 'App',
data(){
return {
// 所有消息
messages: [],
// 单个消息
message: "",
// websocket对象
websocket: null,
// 用户
user: null,
// 连接标识
lockReconnect: false,
// 超时记录
timeout: 5000,
timeoutObj: null,
}
},
methods:{
/**
* 登录
*/
login(){
if(this.user){
this.addChat(this.user);
}
},
/**
* 添加聊天室
*/
addChat(user){
// 创建一个 WebSocket 连接
this.websocket = new WebSocket(`ws://localhost:8000/ws/${user}`);

// 赋值this 给that 后面内部方法读取不到正确的this
let that = this;

// 添加事件处理程序
this.websocket.onopen = () => {
console.log('WebSocket 已打开');
// 每隔一段时间发送心跳消息 保持与服务器之间的连接
setInterval(() => {
if (this.websocket && this.websocket.readyState === WebSocket.OPEN) {
that.sendHeartbeatMessage();
} else {
clearInterval();
}
}, that.timeout); // 这里设置为每隔5秒发送一次心跳消息
};

// 收到消息
this.websocket.onmessage = (event) =>{
if(event.data.indexOf("heartbeat")<0){
that.messages.push(event.data);
}
//收到服务器信息,心跳重置
that.reset();
};
this.websocket.onerror = () =>{
console.log("连接失败");
//重连
that.reconnect();
};
// 后端如果重启或者其他原因导致之前的连接已经断开 需要重新连接
this.websocket.onclose = () =>{
console.log("连接关闭");
//重连
that.reconnect();
}
},
/**
* 向服务器发送心跳消息
*/
sendHeartbeatMessage() {
this.websocket.send('heartbeat');
},
/**
* 给服务端发送消息
*/
sendMessage(){
if(this.message){
this.websocket.send(this.message);
this.message="";
}
},
/**
* 重置
*/
reset() {
console.log("连接成功,重置");
var that = this;
//清除时间
if (that.timeoutObj) clearInterval(that.timeoutObj);
},
/**
* 重新连接
*/
reconnect(){
var that = this;
clearInterval(that.timeoutObj);
that.timeoutObj = setInterval(() => {
console.log("尝试重新连接");
// that.websocket.close();
that.addChat(that.user);
if (this.websocket && this.websocket.readyState === WebSocket.OPEN) {
that.sendHeartbeatMessage();
}
}, that.timeout); // 这里设置为每隔5秒发送一次心跳消息
},
}
}
</script>

进入页面,输入用户名即可进入聊天室,后端会广播所有用户 某某某进入聊天室;心跳机制是为了保持客户端与服务端的连接,如果长时间不发送消息会导致连接断开,所以增加心跳机制,这部分可根据个人需要自行完善。