0%

封装一个Websocket Hooks

背景

由于需求是一个web terminal,前端使用xterm.js,后端使用go-tty的接口部分,用到了Websocket通信,但发现市面上大大多数的库都是基于发布订阅的,需要后端也一起支持(不太可能),所以自己封装了个Hooks。

Hooks功能梳理

  • 需实现连接、断开、发送、接收、错误接收等基本方法。
  • 加入心跳机制,保持通讯通畅。
  • 非人为断开,需快速重连。
  • 超时重试机制。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import { useState, useCallback, useRef } from 'react';

export enum SOCKET_STATE {
CONNECTING = 0,
OPEN = 1,
CLOSING = 2,
CLOSED = 3,
}

export interface Connection {
url: string;
timeoutTime?: number;
serverTimeoutTime?: number;
}

interface OnReceive {
types: string;
data: string;
cmd: string;
}

export function useWebSocket({ url, timeoutTime = 1000 * 30, serverTimeoutTime = 1000 * 60 * 30 }: Connection) {
const [socket, setSocket] = useState<WebSocket | null>(null);
const [isLoading, setLoading] = useState<boolean>(false);
const [isOpen, setOpen] = useState<boolean>(false);
const [alertState, setAlertState] = useState<boolean>(false);
const [lockReconnect, setLockReconnect] = useState<boolean>(false);
const [timeout] = useState<number>(timeoutTime); // 30s client心跳倒计时间
const [serverTimeout] = useState<number>(serverTimeoutTime); // 30s server心跳倒计时间
const timeoutObj = useRef<number>(); // client心跳倒计时
const serverTimeoutObj = useRef<number>(); // server心跳倒计时

const connect = useCallback(() => {
try {
setLoading(true);
setSocket(new WebSocket(url));
// eslint-disable-next-line no-empty
} catch (error) {
setLoading(false);
}
}, [url]);

const initSocket = () => {
if (!socket || socket.readyState === SOCKET_STATE.CLOSED) {
connect();
}
};

const onOpen = (callback: (event: WebSocketEventMap['open']) => void) => {
if (!socket) return;
socket.onopen = (event: WebSocketEventMap['open']) => {
setLoading(false);
setOpen(true);
callback(event);
};
};

const onClose = (callback: (event: WebSocketEventMap['close']) => void) => {
if (!socket) return;
socket.onclose = (event: WebSocketEventMap['close']) => {
if (socket?.readyState === SOCKET_STATE.OPEN) return;
setOpen(false);
setLoading(false);
setAlertState(true);

callback(event);
};
};

const onError = (callback: (error?: WebSocketEventMap['error']) => void) => {
if (!socket) return;
socket.onerror = (error: WebSocketEventMap['error']) => {
setLoading(false);
callback(error);
};
};

// 发送
const sendMessage = (data: string) => {
if (socket?.readyState === SOCKET_STATE.OPEN) {
socket.send(data);
}
};

// 接收消息
const onReceive = (callback: (data: OnReceive) => void) => {
if (!socket) return;
socket.onmessage = (e: WebSocketEventMap['message']) => {
try {
const data = JSON.parse(e.data);
callback(data);
// eslint-disable-next-line no-empty
} catch (error) {}
};
};

// 关闭 WebSocket
const closeWebSocket = (code?: number | undefined, reason?: string | undefined) => {
if (!socket) return;
clearHeartCheck();
socket.close(code, reason);
};

// 重连
const reconnect = useCallback(() => {
if (lockReconnect) return;
setLockReconnect(true);
// 没连接上会一直重连,设置延迟避免请求过多
setAlertState(false);
setLoading(true);
setTimeout(() => {
// 新连接
connect();
setLockReconnect(false);
}, 1000);
}, [connect, lockReconnect]);

// 清除心跳
const clearHeartCheck = () => {
if (timeoutObj) {
clearTimeout(timeoutObj.current);
}
if (serverTimeoutObj) {
clearTimeout(serverTimeoutObj.current);
}
};

// 心跳
const heartCheck = () => {
clearHeartCheck();
timeoutObj.current = window.setTimeout(() => {
// 这里发送一个心跳,后端收到后,返回一个心跳消息,
if (socket?.readyState === WebSocket.OPEN) {
// 如果连接正常
sendMessage('ping');
} else if (socket?.readyState === WebSocket.CLOSED) {
reconnect(); // 否则重连
}

serverTimeoutObj.current = window.setTimeout(() => {
// 超时关闭
closeWebSocket();
}, serverTimeout);
}, timeout);
};

useEffect(() => {
initSocket();
return () => {
closeWebSocket();
};
},[]);

return {
socket,
isLoading,
isOpen,
alertState,
setAlertState,
onOpen,
onClose,
onError,
sendMessage,
onReceive,
clearHeartCheck,
reconnect,
heartCheck,
closeWebSocket,
};
}

使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { useWebSocket } from 'utils/hooks/useWebSocket';

const {
socket,
isLoading,
alertState,
setAlertState,
onOpen,
onClose,
onError,
sendMessage,
onReceive,
reconnect,
heartCheck,
closeWebSocket,
} = useWebSocket({ url: 'wss://xxxx' });

onOpen(() => {
sendMessage('hello world');
});

onReceive(payload => {
const { types, data } = payload;
console.log(types, data)
})

onClose(() => {});

onError(() => {});

closeWebSocket(1000);

总结

  • 必须加入心跳机制,否则可能会莫名奇妙的断开,并且无论发送正常消息与否,心跳需一直存在。
  • 断开后在一定时间内加入重试机制,否则断开,记录重连状态,避免重复发送。
  • 对于异常处理,最好能有一种消息类型定义方便提示信息,否则会很麻烦,原生的Error十分不靠谱。