wait for session:reconnect ack to send queued msgs (#723) (#732)

Co-authored-by: Hoan Luu Huu <110280845+xquanluu@users.noreply.github.com>
This commit is contained in:
Dave Horton
2024-04-25 11:22:15 -04:00
committed by GitHub
parent a18d55e9ab
commit 59cf02bd04
2 changed files with 77 additions and 13 deletions

View File

@@ -115,6 +115,7 @@ class CallSession extends Emitter {
this.logger.debug(`CallSession: ${this.callSid} listener count ${this.requestor.listenerCount('command')}`); this.logger.debug(`CallSession: ${this.callSid} listener count ${this.requestor.listenerCount('command')}`);
this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this)); this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this));
this.requestor.on('handover', handover.bind(this)); this.requestor.on('handover', handover.bind(this));
this.requestor.on('reconnect-error', this._onSessionReconnectError.bind(this));
}; };
if (!this.isConfirmCallSession) { if (!this.isConfirmCallSession) {
@@ -122,6 +123,7 @@ class CallSession extends Emitter {
this.logger.debug(`CallSession: ${this.callSid} listener count ${this.requestor.listenerCount('command')}`); this.logger.debug(`CallSession: ${this.callSid} listener count ${this.requestor.listenerCount('command')}`);
this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this)); this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this));
this.requestor.on('handover', handover.bind(this)); this.requestor.on('handover', handover.bind(this));
this.requestor.on('reconnect-error', this._onSessionReconnectError.bind(this));
} }
} }
@@ -1704,6 +1706,22 @@ Duration=${duration} `
}, 'CallSession:_injectTasks - completed'); }, 'CallSession:_injectTasks - completed');
} }
async _onSessionReconnectError(err) {
const {writeAlerts, AlertType} = this.srf.locals;
const sid = this.accountInfo.account.account_sid;
this.logger.info({err}, `_onSessionReconnectError for account ${sid}`);
try {
await writeAlerts({
alert_type: AlertType.WEBHOOK_CONNECTION_FAILURE,
account_sid: this.accountSid,
detail: `Session:reconnect error ${err}`
});
} catch (error) {
this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert');
}
this._jambonzHangup();
}
_onCommand({msgid, command, call_sid, queueCommand, data}) { _onCommand({msgid, command, call_sid, queueCommand, data}) {
this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command'); this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command');
let resolution; let resolution;

View File

@@ -56,6 +56,12 @@ class WsRequestor extends BaseRequestor {
} }
if (type === 'session:new') this.call_sid = params.callSid; if (type === 'session:new') this.call_sid = params.callSid;
if (type === 'session:reconnect') {
this._reconnectPromise = new Promise((resolve, reject) => {
this._reconnectResolve = resolve;
this._reconnectReject = reject;
});
}
/* if we have an absolute url, and it is http then do a standard webhook */ /* if we have an absolute url, and it is http then do a standard webhook */
if (this._isAbsoluteUrl(url) && url.startsWith('http')) { if (this._isAbsoluteUrl(url) && url.startsWith('http')) {
@@ -71,20 +77,23 @@ class WsRequestor extends BaseRequestor {
} }
/* connect if necessary */ /* connect if necessary */
const queueMsg = () => {
this.logger.debug(
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
if (wantsAck) {
const p = new Promise((resolve, reject) => {
this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}});
});
return p;
}
else {
this.queuedMsg.push({type, hook, params, httpHeaders});
}
return;
};
if (!this.ws) { if (!this.ws) {
if (this.connectInProgress) { if (this.connectInProgress) {
this.logger.debug( return queueMsg();
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
if (wantsAck) {
const p = new Promise((resolve, reject) => {
this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}});
});
return p;
}
else {
this.queuedMsg.push({type, hook, params, httpHeaders});
}
return;
} }
this.connectInProgress = true; this.connectInProgress = true;
this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`); this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`);
@@ -102,6 +111,10 @@ class WsRequestor extends BaseRequestor {
return Promise.reject(err); return Promise.reject(err);
} }
} }
// If jambonz wait for ack from reconnect, queue the msg until reconnect is acked
if (type !== 'session:reconnect' && this._reconnectPromise) {
return queueMsg();
}
assert(this.ws); assert(this.ws);
/* prepare and send message */ /* prepare and send message */
@@ -139,6 +152,18 @@ class WsRequestor extends BaseRequestor {
} }
}; };
const rejectQueuedMsgs = (err) => {
if (this.queuedMsg.length > 0) {
for (const {promise} of this.queuedMsg) {
this.logger.debug(`WsRequestor:request - preparing queued ${type} for rejectQueuedMsgs`);
if (promise) {
promise.reject(err);
}
}
this.queuedMsg.length = 0;
}
};
//this.logger.debug({obj}, `websocket: sending (${url})`); //this.logger.debug({obj}, `websocket: sending (${url})`);
/* special case: reconnecting before we received ack to session:new */ /* special case: reconnecting before we received ack to session:new */
@@ -179,16 +204,37 @@ class WsRequestor extends BaseRequestor {
this.logger.debug({response}, `WsRequestor:request ${url} succeeded in ${rtt}ms`); this.logger.debug({response}, `WsRequestor:request ${url} succeeded in ${rtt}ms`);
this.stats.histogram('app.hook.ws_response_time', rtt, ['hook_type:app']); this.stats.histogram('app.hook.ws_response_time', rtt, ['hook_type:app']);
resolve(response); resolve(response);
if (this._reconnectResolve) {
this._reconnectResolve();
}
}, },
failure: (err) => { failure: (err) => {
if (this._reconnectReject) {
this._reconnectReject(err);
}
clearTimeout(timer); clearTimeout(timer);
reject(err); reject(err);
} }
}); });
/* send the message */ /* send the message */
this.ws.send(JSON.stringify(obj), () => { this.ws.send(JSON.stringify(obj), async() => {
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
// If session:reconnect is waiting for ack, hold here until ack to send queuedMsgs
if (this._reconnectPromise) {
try {
await this._reconnectPromise;
} catch (err) {
// bad thing happened to session:recconnect
rejectQueuedMsgs(err);
this.emit('reconnect-error');
return;
} finally {
this._reconnectPromise = null;
this._reconnectResolve = null;
this._reconnectReject = null;
}
}
sendQueuedMsgs(); sendQueuedMsgs();
}); });
}); });