diff --git a/lib/session/call-session.js b/lib/session/call-session.js index a1a2aaee..f9639739 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -115,6 +115,7 @@ class CallSession extends Emitter { this.logger.debug(`CallSession: ${this.callSid} listener count ${this.requestor.listenerCount('command')}`); this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this)); this.requestor.on('handover', handover.bind(this)); + this.requestor.on('reconnect-error', this._onSessionReconnectError.bind(this)); }; if (!this.isConfirmCallSession) { @@ -122,6 +123,7 @@ class CallSession extends Emitter { this.logger.debug(`CallSession: ${this.callSid} listener count ${this.requestor.listenerCount('command')}`); this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this)); this.requestor.on('handover', handover.bind(this)); + this.requestor.on('reconnect-error', this._onSessionReconnectError.bind(this)); } } @@ -1704,6 +1706,22 @@ Duration=${duration} ` }, 'CallSession:_injectTasks - completed'); } + async _onSessionReconnectError(err) { + const {writeAlerts, AlertType} = this.srf.locals; + const sid = this.accountInfo.account.account_sid; + this.logger.info({err}, `_onSessionReconnectError for account ${sid}`); + try { + await writeAlerts({ + alert_type: AlertType.WEBHOOK_CONNECTION_FAILURE, + account_sid: this.accountSid, + detail: `Session:reconnect error ${err}` + }); + } catch (error) { + this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert'); + } + this._jambonzHangup(); + } + _onCommand({msgid, command, call_sid, queueCommand, data}) { this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command'); let resolution; diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index 3d7e3b69..d5c8645f 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -56,6 +56,12 @@ class WsRequestor extends BaseRequestor { } if (type === 'session:new') this.call_sid = params.callSid; + if (type === 'session:reconnect') { + this._reconnectPromise = new Promise((resolve, reject) => { + this._reconnectResolve = resolve; + this._reconnectReject = reject; + }); + } /* if we have an absolute url, and it is http then do a standard webhook */ if (this._isAbsoluteUrl(url) && url.startsWith('http')) { @@ -71,20 +77,23 @@ class WsRequestor extends BaseRequestor { } /* connect if necessary */ + const queueMsg = () => { + this.logger.debug( + `WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`); + if (wantsAck) { + const p = new Promise((resolve, reject) => { + this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}}); + }); + return p; + } + else { + this.queuedMsg.push({type, hook, params, httpHeaders}); + } + return; + }; if (!this.ws) { if (this.connectInProgress) { - this.logger.debug( - `WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`); - if (wantsAck) { - const p = new Promise((resolve, reject) => { - this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}}); - }); - return p; - } - else { - this.queuedMsg.push({type, hook, params, httpHeaders}); - } - return; + return queueMsg(); } this.connectInProgress = true; this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`); @@ -102,6 +111,10 @@ class WsRequestor extends BaseRequestor { return Promise.reject(err); } } + // If jambonz wait for ack from reconnect, queue the msg until reconnect is acked + if (type !== 'session:reconnect' && this._reconnectPromise) { + return queueMsg(); + } assert(this.ws); /* prepare and send message */ @@ -139,6 +152,18 @@ class WsRequestor extends BaseRequestor { } }; + const rejectQueuedMsgs = (err) => { + if (this.queuedMsg.length > 0) { + for (const {promise} of this.queuedMsg) { + this.logger.debug(`WsRequestor:request - preparing queued ${type} for rejectQueuedMsgs`); + if (promise) { + promise.reject(err); + } + } + this.queuedMsg.length = 0; + } + }; + //this.logger.debug({obj}, `websocket: sending (${url})`); /* special case: reconnecting before we received ack to session:new */ @@ -179,16 +204,37 @@ class WsRequestor extends BaseRequestor { this.logger.debug({response}, `WsRequestor:request ${url} succeeded in ${rtt}ms`); this.stats.histogram('app.hook.ws_response_time', rtt, ['hook_type:app']); resolve(response); + if (this._reconnectResolve) { + this._reconnectResolve(); + } }, failure: (err) => { + if (this._reconnectReject) { + this._reconnectReject(err); + } clearTimeout(timer); reject(err); } }); /* send the message */ - this.ws.send(JSON.stringify(obj), () => { + this.ws.send(JSON.stringify(obj), async() => { this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); + // If session:reconnect is waiting for ack, hold here until ack to send queuedMsgs + if (this._reconnectPromise) { + try { + await this._reconnectPromise; + } catch (err) { + // bad thing happened to session:recconnect + rejectQueuedMsgs(err); + this.emit('reconnect-error'); + return; + } finally { + this._reconnectPromise = null; + this._reconnectResolve = null; + this._reconnectReject = null; + } + } sendQueuedMsgs(); }); });