WsRequestor: reconnect if socket dropped from far end

This commit is contained in:
Dave Horton
2022-05-09 12:14:13 -04:00
parent 34fe22f6e1
commit c412554c6b

View File

@@ -14,10 +14,11 @@ class WsRequestor extends BaseRequestor {
this.connections = 0; this.connections = 0;
this.messagesInFlight = new Map(); this.messagesInFlight = new Map();
this.maliciousClient = false; this.maliciousClient = false;
this.closedByUs = false; this.closedGracefully = false;
this.backoffMs = 500; this.backoffMs = 500;
this.connectInProgress = false; this.connectInProgress = false;
this.queuedMsg = []; this.queuedMsg = [];
this.id = short.generate();
assert(this._isAbsoluteUrl(this.url)); assert(this._isAbsoluteUrl(this.url));
@@ -43,7 +44,7 @@ class WsRequestor extends BaseRequestor {
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
return; return;
} }
if (this.closedByUs) { if (this.closedGracefully) {
this.logger.debug(`WsRequestor:request - discarding ${type} because we closed the socket`); this.logger.debug(`WsRequestor:request - discarding ${type} because we closed the socket`);
return; return;
} }
@@ -60,12 +61,13 @@ class WsRequestor extends BaseRequestor {
/* connect if necessary */ /* connect if necessary */
if (!this.ws) { if (!this.ws) {
if (this.connectInProgress) { if (this.connectInProgress) {
this.logger.debug(`WsRequestor:request - queueing ${type} message since we are connecting`); this.logger.debug(
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
this.queuedMsg.push({type, hook, params, httpHeaders}); this.queuedMsg.push({type, hook, params, httpHeaders});
return; return;
} }
this.connectInProgress = true; this.connectInProgress = true;
this.logger.debug('WsRequestor:request - connecting since we do not have a connection'); this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection`);
if (this.connections >= MAX_RECONNECTS) { if (this.connections >= MAX_RECONNECTS) {
throw new Error(`max attempts connecting to ${this.url}`); throw new Error(`max attempts connecting to ${this.url}`);
} }
@@ -83,7 +85,9 @@ class WsRequestor extends BaseRequestor {
assert(this.ws); assert(this.ws);
/* prepare and send message */ /* prepare and send message */
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null; let payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
if (type === 'session:new') this._sessionData = payload;
if (type === 'session:reconnect') payload = this._sessionData;
assert.ok(url, 'WsRequestor:request url was not provided'); assert.ok(url, 'WsRequestor:request url was not provided');
const msgid = short.generate(); const msgid = short.generate();
@@ -108,10 +112,9 @@ class WsRequestor extends BaseRequestor {
}; };
//this.logger.debug({obj}, `websocket: sending (${url})`); //this.logger.debug({obj}, `websocket: sending (${url})`);
this.connectInProgress = false;
/* simple notifications */ /* simple notifications */
if (['call:status', 'jambonz:error'].includes(type)) { if (['call:status', 'jambonz:error', 'session:reconnect'].includes(type)) {
this.ws.send(JSON.stringify(obj), () => { this.ws.send(JSON.stringify(obj), () => {
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
sendQueuedMsgs(); sendQueuedMsgs();
@@ -154,7 +157,7 @@ class WsRequestor extends BaseRequestor {
} }
close() { close() {
this.closedByUs = true; this.closedGracefully = true;
this.logger.info('WsRequestor:close closing socket'); this.logger.info('WsRequestor:close closing socket');
try { try {
if (this.ws) { if (this.ws) {
@@ -190,9 +193,8 @@ class WsRequestor extends BaseRequestor {
this this
.once('ready', (ws) => { .once('ready', (ws) => {
this.ws = ws;
this.removeAllListeners('not-ready'); this.removeAllListeners('not-ready');
if (this.connections > 0) this.request('session:reconnect', this.url); if (this.connections > 1) this.request('session:reconnect', this.url);
resolve(); resolve();
}) })
.once('not-ready', (err) => { .once('not-ready', (err) => {
@@ -222,18 +224,23 @@ class WsRequestor extends BaseRequestor {
} }
_onOpen(ws) { _onOpen(ws) {
this.logger.info({url: this.url}, 'WsRequestor - successfully connected'); this.logger.info({url: this.url}, `WsRequestor(${this.id}) - successfully connected`);
if (this.ws) this.logger.info({old_ws: this.ws._socket.address()}, 'WsRequestor:_onOpen'); if (this.ws) this.logger.info({old_ws: this.ws._socket.address()}, 'WsRequestor:_onOpen');
assert(!this.ws); assert(!this.ws);
this.ws = ws;
this.connectInProgress = false;
this.connections++;
this.emit('ready', ws); this.emit('ready', ws);
} }
_onClose() { _onClose(code) {
if (this.connections > 0) { this.logger.info(`WsRequestor(${this.id}) - closed from far end ${code}`);
if (this.connections > 0 && code !== 1000) {
this.logger.info({url: this.url}, 'WsRequestor - socket closed unexpectedly from remote side'); this.logger.info({url: this.url}, 'WsRequestor - socket closed unexpectedly from remote side');
this.emit('socket-closed'); this.emit('socket-closed');
} }
this.ws && this.ws.removeAllListeners(); else if (code === 1000) this.closedGracefully = true;
this.ws?.removeAllListeners();
this.ws = null; this.ws = null;
} }
@@ -251,8 +258,17 @@ class WsRequestor extends BaseRequestor {
_onSocketClosed() { _onSocketClosed() {
this.ws = null; this.ws = null;
this.emit('connection-dropped'); this.emit('connection-dropped');
if (this.connections++ > 0 && this.connections < MAX_RECONNECTS && !this.closedByUs) { if (this.connections > 0 && this.connections < MAX_RECONNECTS && !this.closedGracefully) {
setTimeout(this._connect.bind(this), this.backoffMs); this.logger.debug(`WsRequestor:_onSocketClosed waiting ${this.backoffMs} to reconnect`);
setTimeout(() => {
this.logger.debug(
{haveWs: !!this.ws, connectInProgress: this.connectInProgress},
'WsRequestor:_onSocketClosed time to reconnect');
if (!this.ws && !this.connectInProgress) {
this.connectInProgress = true;
this._connect().catch((err) => this.connectInProgress = false);
}
}, this.backoffMs);
this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000); this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000);
} }
} }