Skip to content

Commit 0928f85

Browse files
authored
feat(browser): websockets improvements and bundle optimizations (#1732)
1 parent 24b39a5 commit 0928f85

File tree

10 files changed

+197
-93
lines changed

10 files changed

+197
-93
lines changed

esbuild.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const options = {
1515
format: 'iife',
1616
platform: 'browser',
1717
globalName: 'mqtt',
18+
sourcemap: false, // this can be enabled while debugging, if we decide to keep this enabled we should also ship the `src` folder to npm
1819
define: {
1920
'global': 'window'
2021
},

package-lock.json

Lines changed: 6 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@
113113
"commist": "^3.2.0",
114114
"concat-stream": "^2.0.0",
115115
"debug": "^4.3.4",
116-
"duplexify": "^4.1.2",
117116
"help-me": "^4.2.0",
118117
"lru-cache": "^10.0.1",
119118
"minimist": "^1.2.8",
@@ -130,7 +129,6 @@
130129
"@esm-bundle/chai": "^4.3.4-fix.0",
131130
"@release-it/conventional-changelog": "^7.0.2",
132131
"@types/chai": "^4.3.10",
133-
"@types/duplexify": "^3.6.4",
134132
"@types/node": "^20.9.0",
135133
"@types/sinon": "^17.0.1",
136134
"@types/tape": "^5.6.4",

src/lib/BufferedDuplex.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { Duplex, Transform } from 'readable-stream'
2+
import { IClientOptions } from './client'
3+
4+
/**
5+
* Utils writev function for browser, ensure to write Buffers to socket (convert strings).
6+
*/
7+
export function writev(
8+
chunks: { chunk: any; encoding: string }[],
9+
cb: (err?: Error) => void,
10+
) {
11+
const buffers = new Array(chunks.length)
12+
for (let i = 0; i < chunks.length; i++) {
13+
if (typeof chunks[i].chunk === 'string') {
14+
buffers[i] = Buffer.from(chunks[i].chunk, 'utf8')
15+
} else {
16+
buffers[i] = chunks[i].chunk
17+
}
18+
}
19+
20+
this._write(Buffer.concat(buffers), 'binary', cb)
21+
}
22+
23+
/**
24+
* How this works:
25+
* - `socket` is the `WebSocket` instance, the connection to our broker.
26+
* - `proxy` is a `Transform`, it ensure data written to the `socket` is a `Buffer`.
27+
* This class buffers the data written to the `proxy` (so then to `socket`) until the `socket` is ready.
28+
* The stream returned from this class, will be passed to the `MqttClient`.
29+
*/
30+
export class BufferedDuplex extends Duplex {
31+
public socket: WebSocket
32+
33+
private proxy: Transform
34+
35+
private isSocketOpen: boolean
36+
37+
private writeQueue: Array<{
38+
chunk: any
39+
encoding: string
40+
cb: (err?: Error) => void
41+
}>
42+
43+
constructor(opts: IClientOptions, proxy: Transform, socket: WebSocket) {
44+
super({
45+
objectMode: true,
46+
})
47+
this.proxy = proxy
48+
this.socket = socket
49+
this.writeQueue = []
50+
51+
if (!opts.objectMode) {
52+
this._writev = writev.bind(this)
53+
}
54+
55+
this.isSocketOpen = false
56+
57+
this.proxy.on('data', (chunk) => {
58+
this.push(chunk)
59+
})
60+
}
61+
62+
_read(size?: number): void {
63+
this.proxy.read(size)
64+
}
65+
66+
_write(chunk: any, encoding: string, cb: (err?: Error) => void) {
67+
if (!this.isSocketOpen) {
68+
// Buffer the data in a queue
69+
this.writeQueue.push({ chunk, encoding, cb })
70+
} else {
71+
this.writeToProxy(chunk, encoding, cb)
72+
}
73+
}
74+
75+
_final(callback: (error?: Error) => void): void {
76+
this.writeQueue = []
77+
this.proxy.end(callback)
78+
}
79+
80+
/** Method to call when socket is ready to stop buffering writes */
81+
socketReady() {
82+
this.emit('connect')
83+
this.isSocketOpen = true
84+
this.processWriteQueue()
85+
}
86+
87+
private writeToProxy(
88+
chunk: any,
89+
encoding: string,
90+
cb: (err?: Error) => void,
91+
) {
92+
if (this.proxy.write(chunk, encoding) === false) {
93+
this.proxy.once('drain', cb)
94+
} else {
95+
cb()
96+
}
97+
}
98+
99+
private processWriteQueue() {
100+
while (this.writeQueue.length > 0) {
101+
const { chunk, encoding, cb } = this.writeQueue.shift()!
102+
this.writeToProxy(chunk, encoding, cb)
103+
}
104+
}
105+
}

src/lib/client.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
440440

441441
public pingTimer: any
442442

443+
/**
444+
* The connection to the Broker. In browsers env this also have `socket` property
445+
* set to the `WebSocket` instance.
446+
*/
443447
public stream: IStream
444448

445449
public queue: { packet: Packet; cb: PacketCallback }[]

src/lib/connect/ali.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { Buffer } from 'buffer'
22
import { Transform } from 'readable-stream'
3-
import duplexify, { Duplexify } from 'duplexify'
43
import { StreamBuilder } from '../shared'
54
import MqttClient, { IClientOptions } from '../client'
5+
import { BufferedDuplex } from '../BufferedDuplex'
66

77
let my: any
88
let proxy: Transform
9-
let stream: Duplexify
9+
let stream: BufferedDuplex
1010
let isInitialized = false
1111

1212
function buildProxy() {
@@ -64,9 +64,7 @@ function bindEventHandler() {
6464
isInitialized = true
6565

6666
my.onSocketOpen(() => {
67-
stream.setReadable(proxy)
68-
stream.setWritable(proxy)
69-
stream.emit('connect')
67+
stream.socketReady()
7068
})
7169

7270
my.onSocketMessage((res) => {
@@ -91,8 +89,8 @@ function bindEventHandler() {
9189
stream.destroy()
9290
})
9391

94-
my.onSocketError((res) => {
95-
stream.destroy(res)
92+
my.onSocketError((err) => {
93+
stream.destroy(err)
9694
})
9795
}
9896

@@ -112,13 +110,14 @@ const buildStream: StreamBuilder = (client, opts) => {
112110

113111
const url = buildUrl(opts, client)
114112
my = opts.my
113+
// https://miniprogram.alipay.com/docs/miniprogram/mpdev/api_network_connectsocket
115114
my.connectSocket({
116115
url,
117116
protocols: websocketSubProtocol,
118117
})
119118

120119
proxy = buildProxy()
121-
stream = duplexify.obj()
120+
stream = new BufferedDuplex(opts, proxy, my)
122121

123122
bindEventHandler()
124123

0 commit comments

Comments
 (0)