k6系列之websocket

源码分析

k6 Module
k6目前有两个ws的模块,基础k6/ws,实验性k6/experimental/websockets

1
2
3
4
5
import expws "github.com/grafana/xk6-websockets/websockets"
"k6/experimental/websockets": &expws.RootModule{},

import "go.k6.io/k6/js/modules/k6/ws"
"k6/ws": ws.New(),
1
2
3
4
5
//"go.k6.io/k6/js/modules/k6/ws"
//看几个重要方法
func (mi *WS) Connect(url string, args ...goja.Value) (*HTTPResponse, error) {}
func (s *Socket) Send(message string) {}
func (s *Socket) SendBinary(message goja.Value) {}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//github.com/grafana/xk6-websockets/websockets
//这里实现则是多了一层channel,并发性能会更优
func (w *webSocket) send(msg goja.Value) {
w.assertStateOpen()

switch o := msg.Export().(type) {
case string:
w.bufferedAmount += len(o)
w.writeQueueCh <- message{
mtype: websocket.TextMessage,
data: []byte(o),
t: time.Now(),
}
case *goja.ArrayBuffer:
b := o.Bytes()
w.bufferedAmount += len(b)
w.writeQueueCh <- message{
mtype: websocket.BinaryMessage,
data: b,
t: time.Now(),
}
case goja.ArrayBuffer:
b := o.Bytes()
w.bufferedAmount += len(b)
w.writeQueueCh <- message{
mtype: websocket.BinaryMessage,
data: b,
t: time.Now(),
}
default:
common.Throw(w.vu.Runtime(), fmt.Errorf("unsupported send type %T", o))
}
}

官方建议

1
2
3
4
5
Note: A module with a better and standard API exists.

The new k6/experimental/websockets API partially implements the WebSockets API living standard.

When possible, we recommend using the new API. It uses a global event loop for consistency with other k6 APIs and better performance.

案例编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import { randomString, randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.1.0/index.js';
import { WebSocket } from 'k6/experimental/websockets';
import { setTimeout, clearTimeout, setInterval, clearInterval } from 'k6/experimental/timers';

const chatRoomName = 'publicRoom'; // choose any chat room name
const sessionDuration = randomIntBetween(5000, 60000); // user session between 5s and 1m

export default function () {
for (let i = 0; i < 4; i++) {
startWSWorker(i);
}
}

function startWSWorker(id) {
// create a new websocket connection
const ws = new WebSocket(`wss://test-api.k6.io/ws/crocochat/${chatRoomName}/`);

ws.addEventListener('open', () => {
// change the user name
ws.send(JSON.stringify({ event: 'SET_NAME', new_name: `Croc ${__VU}:${id}` }));

// listen for messages/errors and log them into console
ws.addEventListener('message', (e) => {
const msg = JSON.parse(e.data);
if (msg.event === 'CHAT_MSG') {
console.log(`VU ${__VU}:${id} received: ${msg.user} says: ${msg.message}`);
} else if (msg.event === 'ERROR') {
console.error(`VU ${__VU}:${id} received:: ${msg.message}`);
} else {
console.log(`VU ${__VU}:${id} received unhandled message: ${msg.message}`);
}
});

// send a message every 2-8 seconds
const intervalId = setInterval(() => {
ws.send(JSON.stringify({ event: 'SAY', message: `I'm saying ${randomString(5)}` }));
}, randomIntBetween(2000, 8000)); // say something every 2-8 seconds

// after a sessionDuration stop sending messages and leave the room
const timeout1id = setTimeout(function () {
clearInterval(intervalId);
console.log(`VU ${__VU}:${id}: ${sessionDuration}ms passed, leaving the chat`);
ws.send(JSON.stringify({ event: 'LEAVE' }));
}, sessionDuration);

// after a sessionDuration + 3s close the connection
const timeout2id = setTimeout(function () {
console.log(`Closing the socket forcefully 3s after graceful LEAVE`);
ws.close();
}, sessionDuration + 3000);

// when connection is closing, clean up the previously created timers
ws.addEventListener('close', () => {
clearTimeout(timeout1id);
clearTimeout(timeout2id);
console.log(`VU ${__VU}:${id}: disconnected`);
});
});
}

k6/experimental/websockets的readyState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//这里要用==1来比较,因为readyState的类型是ReadyState,要用非严格比较
if (!check(ws, {
'is status 200': (r) => r.readyState == 1,
})){
ws.close()
}
//无论是看go源码还是js的源码都很轻松分析出
export enum ReadyState {
/**
* Socket has been created. The connection is not yet open.
*/
Connecting = 0,

/**
* The connection is open and ready to communicate.
*/
Open = 1,

/**
* The connection is in the process of closing.
*/
Closing = 2,

/**
* The connection is closed or couldn't be opened.
*/
Closed = 3,
}

k6/experimental/websockets的bufferedAmount

表示队列中的数据量,应该是可以用来监测 WebSocket 连接的发送情况


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!