mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-20 08:40:38 +00:00
Feature/ws api (#72)
initial changes to support websockets as an alternative to webhooks
This commit is contained in:
@@ -7,7 +7,8 @@ const sessionTracker = require('./session-tracker');
|
||||
const makeTask = require('../tasks/make_task');
|
||||
const normalizeJambones = require('../utils/normalize-jambones');
|
||||
const listTaskNames = require('../utils/summarize-tasks');
|
||||
const Requestor = require('../utils/requestor');
|
||||
const HttpRequestor = require('../utils/http-requestor');
|
||||
const WsRequestor = require('../utils/ws-requestor');
|
||||
const BADPRECONDITIONS = 'preconditions not met';
|
||||
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
|
||||
|
||||
@@ -62,6 +63,8 @@ class CallSession extends Emitter {
|
||||
}
|
||||
|
||||
this._pool = srf.locals.dbHelpers.pool;
|
||||
|
||||
this.requestor.on('command', this._onCommand.bind(this));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -289,6 +292,7 @@ class CallSession extends Emitter {
|
||||
*/
|
||||
async exec() {
|
||||
this.logger.info({tasks: listTaskNames(this.tasks)}, `CallSession:exec starting ${this.tasks.length} tasks`);
|
||||
|
||||
while (this.tasks.length && !this.callGone) {
|
||||
const taskNum = ++this.taskIdx;
|
||||
const stackNum = this.stackIdx;
|
||||
@@ -302,7 +306,7 @@ class CallSession extends Emitter {
|
||||
this.logger.info(`CallSession:exec completed task #${stackNum}:${taskNum}: ${task.name}`);
|
||||
} catch (err) {
|
||||
this.currentTask = null;
|
||||
if (err.message.includes(BADPRECONDITIONS)) {
|
||||
if (err.message?.includes(BADPRECONDITIONS)) {
|
||||
this.logger.info(`CallSession:exec task #${stackNum}:${taskNum}: ${task.name}: ${err.message}`);
|
||||
}
|
||||
else {
|
||||
@@ -310,6 +314,16 @@ class CallSession extends Emitter {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 === this.tasks.length && this.hasStableDialog && this.requestor instanceof WsRequestor) {
|
||||
try {
|
||||
await this._awaitCommandsOrHangup();
|
||||
if (!this.hasStableDialog || this.callGone) break;
|
||||
} catch (err) {
|
||||
this.logger.info(err, 'CallSession:exec - error waiting for new commands');
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// all done - cleanup
|
||||
@@ -368,6 +382,10 @@ class CallSession extends Emitter {
|
||||
this.currentTask.kill(this);
|
||||
this.currentTask = null;
|
||||
}
|
||||
if (this.wakeupResolver) {
|
||||
this.wakeupResolver();
|
||||
this.wakeupResolver = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -404,29 +422,42 @@ class CallSession extends Emitter {
|
||||
*/
|
||||
async _lccCallHook(opts) {
|
||||
const webhooks = [];
|
||||
let sd;
|
||||
if (opts.call_hook) webhooks.push(this.requestor.request(opts.call_hook, this.callInfo.toJSON()));
|
||||
if (opts.child_call_hook) {
|
||||
/* child call hook only allowed from a connected Dial state */
|
||||
const task = this.currentTask;
|
||||
sd = task.sd;
|
||||
if (task && TaskName.Dial === task.name && sd) {
|
||||
webhooks.push(this.requestor.request(opts.child_call_hook, sd.callInfo.toJSON()));
|
||||
let sd, tasks, childTasks;
|
||||
|
||||
if (opts.call_hook || opts.child_call_hook) {
|
||||
if (opts.call_hook) {
|
||||
webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON()));
|
||||
}
|
||||
if (opts.child_call_hook) {
|
||||
/* child call hook only allowed from a connected Dial state */
|
||||
const task = this.currentTask;
|
||||
sd = task.sd;
|
||||
if (task && TaskName.Dial === task.name && sd) {
|
||||
webhooks.push(this.requestor.request('session:redirect', opts.child_call_hook, sd.callInfo.toJSON()));
|
||||
}
|
||||
}
|
||||
const [tasks1, tasks2] = await Promise.all(webhooks);
|
||||
if (opts.call_hook) {
|
||||
tasks = tasks1;
|
||||
if (opts.child_call_hook) childTasks = tasks2;
|
||||
}
|
||||
else childTasks = tasks1;
|
||||
}
|
||||
const [tasks1, tasks2] = await Promise.all(webhooks);
|
||||
let tasks, childTasks;
|
||||
if (opts.call_hook) {
|
||||
tasks = tasks1;
|
||||
if (opts.child_call_hook) childTasks = tasks2;
|
||||
else if (opts.parent_call || opts.child_call) {
|
||||
const {parent_call, child_call} = opts;
|
||||
assert.ok(!parent_call || Array.isArray(parent_call), 'CallSession:_lccCallHook - parent_call must be an array');
|
||||
assert.ok(!child_call || Array.isArray(child_call), 'CallSession:_lccCallHook - child_call must be an array');
|
||||
tasks = parent_call;
|
||||
childTasks = child_call;
|
||||
}
|
||||
else childTasks = tasks1;
|
||||
|
||||
if (childTasks) {
|
||||
const {parentLogger} = this.srf.locals;
|
||||
const childLogger = parentLogger.child({callId: this.callId, callSid: sd.callSid});
|
||||
const t = normalizeJambones(childLogger, childTasks).map((tdata) => makeTask(childLogger, tdata));
|
||||
childLogger.info({tasks: listTaskNames(t)}, 'CallSession:_lccCallHook new task list for child call');
|
||||
|
||||
// TODO: if using websockets api, we need a new websocket for the adulting session..
|
||||
const cs = await sd.doAdulting({
|
||||
logger: childLogger,
|
||||
application: this.application,
|
||||
@@ -604,6 +635,59 @@ class CallSession extends Emitter {
|
||||
this.taskIdx = 0;
|
||||
}
|
||||
|
||||
_onCommand({msgid, command, queueCommand, data}) {
|
||||
this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command');
|
||||
switch (command) {
|
||||
case 'redirect':
|
||||
if (Array.isArray(data)) {
|
||||
const t = normalizeJambones(this.logger, data).map((tdata) => makeTask(this.logger, tdata));
|
||||
if (!queueCommand) {
|
||||
this.logger.info({tasks: listTaskNames(t)}, 'CallSession:_onCommand new task list');
|
||||
this.replaceApplication(t);
|
||||
}
|
||||
else {
|
||||
this.logger.info({t, tasks: this.tasks}, 'CallSession:_onCommand - about to queue tasks');
|
||||
this.tasks.push(...t);
|
||||
this.logger.debug({tasks: this.tasks}, 'CallSession:_onCommand - tasks have been queued');
|
||||
}
|
||||
}
|
||||
else this._lccCallHook(data);
|
||||
break;
|
||||
|
||||
case 'call:status':
|
||||
this._lccCallStatus(data);
|
||||
break;
|
||||
|
||||
case 'mute:status':
|
||||
this._lccMuteStatus(data);
|
||||
break;
|
||||
|
||||
case 'conf:mute-status':
|
||||
this._lccConfMuteStatus(data);
|
||||
break;
|
||||
|
||||
case 'conf:hold-status':
|
||||
this._lccConfHoldStatus(data);
|
||||
break;
|
||||
|
||||
case 'listen:status':
|
||||
this._lccListenStatus(data);
|
||||
break;
|
||||
|
||||
case 'whisper':
|
||||
this._lccWhisper(data);
|
||||
break;
|
||||
|
||||
default:
|
||||
this.logger.info(`CallSession:_onCommand - invalid command ${command}`);
|
||||
}
|
||||
if (this.wakeupResolver) {
|
||||
this.logger.info('CallSession:_onCommand - got commands, waking up..');
|
||||
this.wakeupResolver();
|
||||
this.wakeupResolver = null;
|
||||
}
|
||||
}
|
||||
|
||||
_evaluatePreconditions(task) {
|
||||
switch (task.preconditions) {
|
||||
case TaskPreconditions.None:
|
||||
@@ -740,6 +824,7 @@ class CallSession extends Emitter {
|
||||
});
|
||||
}
|
||||
this.tmpFiles.clear();
|
||||
this.requestor && this.requestor.close();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -841,7 +926,7 @@ class CallSession extends Emitter {
|
||||
}
|
||||
else {
|
||||
this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found');
|
||||
this.queueEventHookRequestor = new Requestor(this.logger, this.accountSid,
|
||||
this.queueEventHookRequestor = new HttpRequestor(this.logger, this.accountSid,
|
||||
r[0], this.webhook_secret);
|
||||
this.queueEventHook = r[0];
|
||||
}
|
||||
@@ -855,7 +940,7 @@ class CallSession extends Emitter {
|
||||
/* send webhook */
|
||||
const params = {...obj, ...this.callInfo.toJSON()};
|
||||
this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook');
|
||||
this.queueEventHookRequestor.request(this.queueEventHook, params)
|
||||
this.queueEventHookRequestor.request('queue:status', this.queueEventHook, params)
|
||||
.catch((err) => {
|
||||
this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event');
|
||||
});
|
||||
@@ -944,6 +1029,10 @@ class CallSession extends Emitter {
|
||||
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
|
||||
this.logger.debug('CallSession: call terminated by jambones');
|
||||
origDestroy();
|
||||
if (this.wakeupResolver) {
|
||||
this.wakeupResolver();
|
||||
this.wakeupResolver = null;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -1002,7 +1091,7 @@ class CallSession extends Emitter {
|
||||
this.callInfo.updateCallStatus(callStatus, sipStatus);
|
||||
if (typeof duration === 'number') this.callInfo.duration = duration;
|
||||
try {
|
||||
this.notifier.request(this.call_status_hook, this.callInfo.toJSON());
|
||||
this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON());
|
||||
} catch (err) {
|
||||
this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
|
||||
}
|
||||
@@ -1012,6 +1101,14 @@ class CallSession extends Emitter {
|
||||
this.updateCallStatus(Object.assign({}, this.callInfo.toJSON()), this.serviceUrl)
|
||||
.catch((err) => this.logger.error(err, 'redis error'));
|
||||
}
|
||||
|
||||
_awaitCommandsOrHangup() {
|
||||
assert(!this.wakeupResolver);
|
||||
return new Promise((resolve, reject) => {
|
||||
this.logger.info('_awaitCommandsOrHangup - waiting...');
|
||||
this.wakeupResolver = resolve;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = CallSession;
|
||||
|
||||
Reference in New Issue
Block a user