diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index 45d6cca9..1d6d24b1 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -14,10 +14,11 @@ class WsRequestor extends BaseRequestor { this.connections = 0; this.messagesInFlight = new Map(); this.maliciousClient = false; - this.closedByUs = false; + this.closedGracefully = false; this.backoffMs = 500; this.connectInProgress = false; this.queuedMsg = []; + this.id = short.generate(); assert(this._isAbsoluteUrl(this.url)); @@ -43,7 +44,7 @@ class WsRequestor extends BaseRequestor { this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); return; } - if (this.closedByUs) { + if (this.closedGracefully) { this.logger.debug(`WsRequestor:request - discarding ${type} because we closed the socket`); return; } @@ -60,12 +61,13 @@ class WsRequestor extends BaseRequestor { /* connect if necessary */ if (!this.ws) { if (this.connectInProgress) { - this.logger.debug(`WsRequestor:request - queueing ${type} message since we are connecting`); + this.logger.debug( + `WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`); this.queuedMsg.push({type, hook, params, httpHeaders}); return; } this.connectInProgress = true; - this.logger.debug('WsRequestor:request - connecting since we do not have a connection'); + this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection`); if (this.connections >= MAX_RECONNECTS) { throw new Error(`max attempts connecting to ${this.url}`); } @@ -83,7 +85,9 @@ class WsRequestor extends BaseRequestor { assert(this.ws); /* prepare and send message */ - const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null; + let payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null; + if (type === 'session:new') this._sessionData = payload; + if (type === 'session:reconnect') payload = this._sessionData; assert.ok(url, 'WsRequestor:request url was not provided'); const msgid = short.generate(); @@ -108,10 +112,9 @@ class WsRequestor extends BaseRequestor { }; //this.logger.debug({obj}, `websocket: sending (${url})`); - this.connectInProgress = false; /* simple notifications */ - if (['call:status', 'jambonz:error'].includes(type)) { + if (['call:status', 'jambonz:error', 'session:reconnect'].includes(type)) { this.ws.send(JSON.stringify(obj), () => { this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); sendQueuedMsgs(); @@ -154,7 +157,7 @@ class WsRequestor extends BaseRequestor { } close() { - this.closedByUs = true; + this.closedGracefully = true; this.logger.info('WsRequestor:close closing socket'); try { if (this.ws) { @@ -190,9 +193,8 @@ class WsRequestor extends BaseRequestor { this .once('ready', (ws) => { - this.ws = ws; this.removeAllListeners('not-ready'); - if (this.connections > 0) this.request('session:reconnect', this.url); + if (this.connections > 1) this.request('session:reconnect', this.url); resolve(); }) .once('not-ready', (err) => { @@ -222,18 +224,23 @@ class WsRequestor extends BaseRequestor { } _onOpen(ws) { - this.logger.info({url: this.url}, 'WsRequestor - successfully connected'); + this.logger.info({url: this.url}, `WsRequestor(${this.id}) - successfully connected`); if (this.ws) this.logger.info({old_ws: this.ws._socket.address()}, 'WsRequestor:_onOpen'); assert(!this.ws); + this.ws = ws; + this.connectInProgress = false; + this.connections++; this.emit('ready', ws); } - _onClose() { - if (this.connections > 0) { + _onClose(code) { + this.logger.info(`WsRequestor(${this.id}) - closed from far end ${code}`); + if (this.connections > 0 && code !== 1000) { this.logger.info({url: this.url}, 'WsRequestor - socket closed unexpectedly from remote side'); this.emit('socket-closed'); } - this.ws && this.ws.removeAllListeners(); + else if (code === 1000) this.closedGracefully = true; + this.ws?.removeAllListeners(); this.ws = null; } @@ -251,8 +258,17 @@ class WsRequestor extends BaseRequestor { _onSocketClosed() { this.ws = null; this.emit('connection-dropped'); - if (this.connections++ > 0 && this.connections < MAX_RECONNECTS && !this.closedByUs) { - setTimeout(this._connect.bind(this), this.backoffMs); + if (this.connections > 0 && this.connections < MAX_RECONNECTS && !this.closedGracefully) { + this.logger.debug(`WsRequestor:_onSocketClosed waiting ${this.backoffMs} to reconnect`); + setTimeout(() => { + this.logger.debug( + {haveWs: !!this.ws, connectInProgress: this.connectInProgress}, + 'WsRequestor:_onSocketClosed time to reconnect'); + if (!this.ws && !this.connectInProgress) { + this.connectInProgress = true; + this._connect().catch((err) => this.connectInProgress = false); + } + }, this.backoffMs); this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000); } }