more work on wss race condition

This commit is contained in:
Dave Horton
2022-05-02 13:32:07 -04:00
parent eb0f55e0e3
commit dea58c2605

View File

@@ -64,6 +64,7 @@ class WsRequestor extends BaseRequestor {
this.queuedMsg.push({type, hook, params, httpHeaders}); this.queuedMsg.push({type, hook, params, httpHeaders});
return; return;
} }
this.connectInProgress = true;
this.logger.debug('WsRequestor:request - connecting since we do not have a connection'); this.logger.debug('WsRequestor:request - connecting since we do not have a connection');
if (this.connections >= MAX_RECONNECTS) { if (this.connections >= MAX_RECONNECTS) {
throw new Error(`max attempts connecting to ${this.url}`); throw new Error(`max attempts connecting to ${this.url}`);
@@ -75,6 +76,7 @@ class WsRequestor extends BaseRequestor {
this.stats.histogram('app.hook.connect_time', rtt, ['hook_type:app']); this.stats.histogram('app.hook.connect_time', rtt, ['hook_type:app']);
} catch (err) { } catch (err) {
this.logger.info({url, err}, 'WsRequestor:request - failed connecting'); this.logger.info({url, err}, 'WsRequestor:request - failed connecting');
this.connectInProgress = false;
throw err; throw err;
} }
} }
@@ -95,21 +97,24 @@ class WsRequestor extends BaseRequestor {
...b3 ...b3
}; };
/* send any queueed messages */ const sendQueuedMsgs = () => {
if (this.queuedMsg.length > 0) { if (this.queuedMsg.length > 0) {
for (const {type, hook, params, httpHeaders} of this.queuedMsg) { for (const {type, hook, params, httpHeaders} of this.queuedMsg) {
this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`); this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`);
setImmediate(this.request.bind(this, type, hook, params, httpHeaders)); setImmediate(this.request.bind(this, type, hook, params, httpHeaders));
}
this.queuedMsg.length = 0;
} }
this.queuedMsg.length = 0; };
}
//this.logger.debug({obj}, `websocket: sending (${url})`); //this.logger.debug({obj}, `websocket: sending (${url})`);
this.connectInProgress = false;
/* simple notifications */ /* simple notifications */
if (['call:status', 'jambonz:error'].includes(type)) { if (['call:status', 'jambonz:error'].includes(type)) {
this.ws.send(JSON.stringify(obj), () => { this.ws.send(JSON.stringify(obj), () => {
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
sendQueuedMsgs();
}); });
return; return;
} }
@@ -143,6 +148,7 @@ class WsRequestor extends BaseRequestor {
/* send the message */ /* send the message */
this.ws.send(JSON.stringify(obj), () => { this.ws.send(JSON.stringify(obj), () => {
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
sendQueuedMsgs();
}); });
}); });
} }
@@ -199,6 +205,7 @@ class WsRequestor extends BaseRequestor {
} }
_setHandlers(ws) { _setHandlers(ws) {
this.logger.debug('WsRequestor:_setHandlers');
ws ws
.once('open', this._onOpen.bind(this, ws)) .once('open', this._onOpen.bind(this, ws))
.once('close', this._onClose.bind(this)) .once('close', this._onClose.bind(this))