ws-requestor: queue outgoing messages if we are in the process of connecting to the remote wss server

This commit is contained in:
Dave Horton
2022-05-02 13:09:23 -04:00
parent 944b8a29ca
commit eb0f55e0e3
2 changed files with 32 additions and 2 deletions

View File

@@ -119,16 +119,25 @@ router.post('/', async(req, res) => {
* these will be used for all http requests we make during this call * these will be used for all http requests we make during this call
*/ */
if ('WS' === app.call_hook?.method || /^wss?:/.test(app.call_hook.url)) { if ('WS' === app.call_hook?.method || /^wss?:/.test(app.call_hook.url)) {
logger.debug({call_hook: app.call_hook}, 'creating websocket for call hook');
app.requestor = new WsRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret) ; app.requestor = new WsRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret) ;
if (app.call_hook.url === app.call_status_hook.url || !app.call_status_hook?.url) app.notifier = app.requestor; if (app.call_hook.url === app.call_status_hook.url || !app.call_status_hook?.url) {
logger.debug('reusing websocket for call status hook');
app.notifier = app.requestor;
}
} }
else { else {
logger.debug({call_hook: app.call_hook}, 'creating http client for call hook');
app.requestor = new HttpRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret); app.requestor = new HttpRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret);
} }
if (!app.notifier && app.call_status_hook) { if (!app.notifier && app.call_status_hook) {
app.notifier = new HttpRequestor(logger, account.account_sid, app.call_status_hook, account.webhook_secret); app.notifier = new HttpRequestor(logger, account.account_sid, app.call_status_hook, account.webhook_secret);
logger.debug({call_hook: app.call_hook}, 'creating http client for call status hook');
}
else if (!app.notifier) {
logger.debug('creating null call status hook');
app.notifier = {request: () => {}};
} }
else if (!app.notifier) app.notifier = {request: () => {}};
/* now launch the outdial */ /* now launch the outdial */
try { try {

View File

@@ -16,6 +16,8 @@ class WsRequestor extends BaseRequestor {
this.maliciousClient = false; this.maliciousClient = false;
this.closedByUs = false; this.closedByUs = false;
this.backoffMs = 500; this.backoffMs = 500;
this.connectInProgress = false;
this.queuedMsg = [];
assert(this._isAbsoluteUrl(this.url)); assert(this._isAbsoluteUrl(this.url));
@@ -41,6 +43,10 @@ class WsRequestor extends BaseRequestor {
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
return; return;
} }
if (this.closedByUs) {
this.logger.debug(`WsRequestor:request - discarding ${type} because we closed the socket`);
return;
}
if (type === 'session:new') this.call_sid = params.callSid; if (type === 'session:new') this.call_sid = params.callSid;
@@ -53,6 +59,12 @@ class WsRequestor extends BaseRequestor {
/* connect if necessary */ /* connect if necessary */
if (!this.ws) { if (!this.ws) {
if (this.connectInProgress) {
this.logger.debug(`WsRequestor:request - queueing ${type} message since we are connecting`);
this.queuedMsg.push({type, hook, params, httpHeaders});
return;
}
this.logger.debug('WsRequestor:request - connecting since we do not have a connection');
if (this.connections >= MAX_RECONNECTS) { if (this.connections >= MAX_RECONNECTS) {
throw new Error(`max attempts connecting to ${this.url}`); throw new Error(`max attempts connecting to ${this.url}`);
} }
@@ -83,6 +95,15 @@ class WsRequestor extends BaseRequestor {
...b3 ...b3
}; };
/* send any queueed messages */
if (this.queuedMsg.length > 0) {
for (const {type, hook, params, httpHeaders} of this.queuedMsg) {
this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`);
setImmediate(this.request.bind(this, type, hook, params, httpHeaders));
}
this.queuedMsg.length = 0;
}
//this.logger.debug({obj}, `websocket: sending (${url})`); //this.logger.debug({obj}, `websocket: sending (${url})`);
/* simple notifications */ /* simple notifications */